diff options
author | drebs <drebs@leap.se> | 2015-02-04 15:04:10 -0200 |
---|---|---|
committer | drebs <drebs@leap.se> | 2015-05-27 14:37:27 -0300 |
commit | 514c1434a016b09d93e8dfc5578b14825d14005a (patch) | |
tree | c4bacce1df24a81b2de3d1343dac26eb56e30ac7 /src | |
parent | 71c750ef9c3e53ef416d1de6e85458f16ca48d74 (diff) |
[feat] refactor events to use ZMQ
Before this commit, protobuf and protobuf.socketrpc were used to serialize and
transmit messages between events clients. This change implements a simpler ZMQ
client/server events mechanism that uses ZMQ sockets for transmitting messages
from clients to server and to redistribute such messages to subscribed
clients.
Closes: #6359
Diffstat (limited to 'src')
-rw-r--r-- | src/leap/common/events/Makefile | 31 | ||||
-rw-r--r-- | src/leap/common/events/README.rst | 117 | ||||
-rw-r--r-- | src/leap/common/events/__init__.py | 315 | ||||
-rw-r--r-- | src/leap/common/events/catalog.py | 88 | ||||
-rw-r--r-- | src/leap/common/events/client.py | 712 | ||||
-rw-r--r-- | src/leap/common/events/daemon.py | 208 | ||||
-rw-r--r-- | src/leap/common/events/errors.py (renamed from src/leap/common/events/mac_auth.py) | 18 | ||||
-rw-r--r-- | src/leap/common/events/events.proto | 145 | ||||
-rw-r--r-- | src/leap/common/events/events_pb2.py | 635 | ||||
-rw-r--r-- | src/leap/common/events/server.py | 242 | ||||
-rw-r--r-- | src/leap/common/events/txclient.py | 185 | ||||
-rw-r--r-- | src/leap/common/events/zmq_components.py | 179 | ||||
-rw-r--r-- | src/leap/common/tests/test_events.py | 468 | ||||
-rw-r--r-- | src/leap/common/zmq_utils.py | 105 |
14 files changed, 1420 insertions, 2028 deletions
diff --git a/src/leap/common/events/Makefile b/src/leap/common/events/Makefile deleted file mode 100644 index 5b7e60d..0000000 --- a/src/leap/common/events/Makefile +++ /dev/null @@ -1,31 +0,0 @@ -# -*- coding: utf-8 -*- -# Makefile -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -# This file is used to generate protobuf python files that are used for IPC: -# -# https://developers.google.com/protocol-buffers/docs/pythontutorial - -PROTOC = protoc - -all: events_pb2.py - -%_pb2.py: %.proto - $(PROTOC) --python_out=./ $< -# autopep8 --in-place --aggressive $@ - -clean: - rm -f *_pb2.py diff --git a/src/leap/common/events/README.rst b/src/leap/common/events/README.rst index 2e7f254..f455cc8 100644 --- a/src/leap/common/events/README.rst +++ b/src/leap/common/events/README.rst @@ -1,19 +1,83 @@ Events mechanism ================ -The events mechanism allows for clients to send signal events to each -other by means of a centralized server. Clients can register with the -server to receive signals of certain types, and they can also send signals to -the server that will then redistribute these signals to registered clients. +The events mechanism allows for clients to send events to each other by means +of a centralized server. Clients can register with the server to receive +events of certain types, and they can also send events to the server that will +then redistribute these events to registered clients. -Listening daemons ------------------ +ZMQ connections and events redistribution +----------------------------------------- -Both clients and the server listen for incoming messages by using a -listening daemon that runs in its own thread. The server daemon has to be -started explicitly, while clients daemon will be started whenever a -client registers with the server to receive messages. +Clients and server use ZMQ connection patterns to communicate. Clients can +push events to the server, and may subscribe to events published by the +server. The server in turn pulls events from clients and publishes them to +subscribed clients. + +Clients connect to the server's zmq pub socket, and register to specific +events indicating which callbacks should be executed when that event is +received: + + + EventsServer + .------------. + |PULL PUB| + '------------' + ^^ + || + reg(1, cbk1) |'--------------. reg(2, cbk2) + | | + | | + .------------. .------------. .------------. + |PUSH SUB| |PUSH SUB| |PUSH SUB| + '------------' '------------' '------------' + EventsClient EventsClient EventsClient + + +A client that wants to send an event connects to the server's zmq pull socket +and pushes the event to the server. The server then redistributes it to all +clients subscribed to that event. + + + EventsServer + .------------. + |PULL---->PUB| + '------------' + ^ |. + | |. +sig(1, 'foo') .----------------' |'............... + | | . + | v . + .------------. .------------. .------------. + |PUSH SUB| |PUSH SUB| |PUSH SUB| + '------------' '------------' '------------' + EventsClient EventsClient EventsClient + | + v + cbk1(1, 'foo') + + +Any client may emit or subscribe to an event. ZMQ will manage sockets and +reuse the connection whenever it can. + + + EventsServer + .------------. + |PULL---->PUB| + '------------' + ^ .| + | .| +sig(2, 'bar') .-----------------' .'--------------. + | . | + | . v + .------------. .------------. .------------. + |PUSH SUB| |PUSH SUB| |PUSH SUB| + '------------' '------------' '------------' + EventsClient EventsClient EventsClient + | + v + cbk2(2, 'bar') How to use it @@ -22,32 +86,27 @@ How to use it To start the events server: >>> from leap.common.events import server ->>> server.ensure_server(port=8090) +>>> server.ensure_server( + emit_addr="tcp://127.0.0.1:9000", + reg_addr="tcp://127.0.0.1:9001") -To register a callback to be called when a given signal is raised: +To register a callback to be called when a given event is raised: ->>> from leap.common.events import ( ->>> register, ->>> events_pb2 as proto, ->>> ) +>>> from leap.common.events import register +>>> from leap.common.events import catalog >>> ->>> def mycallback(sigreq): ->>> print str(sigreq) +>>> def mycbk(event, *content): +>>> print "%s, %s" (str(event), str(content)) >>> ->>> events.register(signal=proto.CLIENT_UID, callback=mycallback) +>>> register(catalog.CLIENT_UID, callback=mycbk) -To signal an event: +To emit an event: ->>> from leap.common.events import ( ->>> signal, ->>> events_pb2 as proto, ->>> ) ->>> signal(proto.CLIENT_UID) +>>> from leap.common.events import emit +>>> from leap.common.events import catalog +>>> emit(catalog.CLIENT_UID) Adding events ------------- -* Add the new event under enum ``Event`` in ``events.proto`` -* Compile the new protocolbuffers file:: - - make +To add a new event, just add it to ``catalog.py``. diff --git a/src/leap/common/events/__init__.py b/src/leap/common/events/__init__.py index 0cc6573..9269b9a 100644 --- a/src/leap/common/events/__init__.py +++ b/src/leap/common/events/__init__.py @@ -15,188 +15,199 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. + """ -This is an events mechanism that uses a server to allow for signaling of -events between clients. +This is an events mechanism that uses a server to allow for emitting events +between clients. Application components should use the interface available in this file to -register callbacks to be executed upon receiving specific signals, and to send -signals to other components. +register callbacks to be executed upon receiving specific events, and to send +events to other components. -To register a callback to be executed when a specific event is signaled, use +To register a callback to be executed when a specific event is emitted, use leap.common.events.register(): >>> from leap.common.events import register ->>> from leap.common.events import events_pb2 as proto ->>> register(proto.CLIENT_UID, lambda req: do_something(req)) - -To signal an event, use leap.common.events.signal(): - ->>> from leap.common.events import signal ->>> from leap.common.events import events_pb2 as proto ->>> signal(proto.CLIENT_UID) - - -NOTE ABOUT SYNC/ASYNC REQUESTS: - -Clients always communicate with the server, and never between themselves. -When a client registers a callback for an event, the callback is saved locally -in the client and the server stores the client socket port in a list -associated with that event. When a client signals an event, the server -forwards the signal to all registered client ports, and then each client -executes its callbacks associated with that event locally. - -Each RPC call from a client to the server is followed by a response from the -server to the client. Calls to register() and signal() (and all other RPC -calls) can be synchronous or asynchronous meaning if they will or not wait for -the server's response before returning. - -This mechanism was built on top of protobuf.socketrpc, and because of this RPC -calls are made synchronous or asynchronous in the following way: - - * If RPC calls receive a parameter called `reqcbk`, then the call is made - asynchronous. That means that: +>>> from leap.common.events import catalog +>>> register(catalog.CLIENT_UID, lambda sig, content: do_something(content)) - - an eventual `timeout` parameter is not used, - - the call returns immediatelly with value None, and - - the `reqcbk` callback is executed asynchronously upon the arrival of - a response from the server. +To emit an event, use leap.common.events.emit(): - * Otherwise, if the `reqcbk` parameter is None, then the call is made in a - synchronous manner: - - - if a response from server arrives within `timeout` milliseconds, the - RPC call returns it; - - otherwise, the call returns None. +>>> from leap.common.events import emit +>>> from leap.common.events import catalog +>>> emit(catalog.CLIENT_UID) """ + import logging -import socket +import argparse + +from leap.common.events import client +from leap.common.events import server +from leap.common.events import catalog -from leap.common.events import ( - events_pb2 as proto, - server, - client, - daemon, -) +__all__ = [ + "register", + "unregister", + "emit", + "catalog", +] logger = logging.getLogger(__name__) -def register(signal, callback, uid=None, replace=False, reqcbk=None, - timeout=1000): +def register(event, callback, uid=None, replace=False): """ - Register a callback to be called when the given signal is received. - - Will timeout after timeout ms if response has not been received. The - timeout arg is only used for asynch requests. If a reqcbk callback has - been supplied the timeout arg is not used. The response value will be - returned for a synch request but nothing will be returned for an asynch - request. - - :param signal: the signal that causes the callback to be launched - :type signal: int (see the `events.proto` file) - :param callback: the callback to be called when the signal is received - :type callback: function - :param uid: a unique id for the callback - :type uid: int - :param replace: should an existent callback with same uid be replaced? + Register a callback to be executed when an event is received. + + :param event: The event that triggers the callback. + :type event: str + :param callback: The callback to be executed. + :type callback: callable(event, content) + :param uid: The callback uid. + :type uid: str + :param replace: Wether an eventual callback with same ID should be + replaced. :type replace: bool - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(leap.common.events.events_pb2.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - :return: the response from server for synch calls or nothing for asynch - calls. - :rtype: leap.common.events.events_pb2.EventsResponse or None - """ - return client.register(signal, callback, uid, replace, reqcbk, timeout) + :return: The callback uid. + :rtype: str -def unregister(signal, uid=None, reqcbk=None, timeout=1000): - """ - Unregister a callback. - - If C{uid} is specified, unregisters only the callback identified by that - unique id. Otherwise, unregisters all callbacks registered for C{signal}. - - :param signal: the signal that causes the callback to be launched - :type signal: int (see the `events.proto` file) - :param uid: a unique id for the callback - :type uid: int - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(proto.UnregisterRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - :return: the response from server for synch calls or nothing for asynch - calls. - :rtype: leap.common.events.events_pb2.EventsResponse or None + :raises CallbackAlreadyRegistered: when there's already a callback + identified by the given uid and replace is False. """ - return client.unregister(signal, uid, reqcbk, timeout) + return client.register(event, callback, uid, replace) -def signal(signal, content="", mac_method="", mac="", reqcbk=None, - timeout=1000): +def unregister(event, uid=None): """ - Send `signal` event to events server. - - Will timeout after timeout ms if response has not been received. The - timeout arg is only used for asynch requests. If a reqcbk callback has - been supplied the timeout arg is not used. The response value will be - returned for a synch request but nothing will be returned for an asynch - request. - - :param signal: the signal that causes the callback to be launched - :type signal: int (see the `events.proto` file) - :param content: the contents of the event signal - :type content: str - :param mac_method: the method used to auth mac - :type mac_method: str - :param mac: the content of the auth mac - :type mac: str - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(proto.SignalRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - :return: the response from server for synch calls or nothing for asynch - calls. - :rtype: leap.common.events.events_pb2.EventsResponse or None - """ - return client.signal(signal, content, mac_method, mac, reqcbk, timeout) + Unregister callbacks for an event. -def ping_client(port, reqcbk=None, timeout=1000): - """ - Ping a client running in C{port}. - - :param port: the port in which the client should be listening - :type port: int - :param reqcbk: a callback to be called when a response from client is - received - :type reqcbk: function(proto.PingRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int + If uid is not None, then only the callback identified by the given uid is + removed. Otherwise, all callbacks for the event are removed. + + :param event: The event that triggers the callback. + :type event: Event + :param uid: The callback uid. + :type uid: str """ - return client.ping(port, reqcbk=reqcbk, timeout=timeout) + return client.unregister(event, uid) -def ping_server(port=server.SERVER_PORT, reqcbk=None, timeout=1000): +def emit(event, *content): """ - Ping the server. - - :param port: the port in which server should be listening - :type port: int - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(proto.PingRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int + Send an event. + + :param event: The event to be sent. + :type event: Event + :param content: The content of the event. + :type content: list """ - return server.ping(port, reqcbk=reqcbk, timeout=timeout) + return client.emit(event, *content) + + +if __name__ == "__main__": + + def _echo(event, *content): + print "Received event: (%s, %s)" % (event, content) + + def _parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument( + "--debug", "-d", action="store_true", + help="print debug information") + + subparsers = parser.add_subparsers(dest="command") + + # server options + server_parser = subparsers.add_parser( + "server", help="Run an events server.") + server_parser.add_argument( + "--emit-addr", + help="The address in which to listen for events", + default=server.EMIT_ADDR) + server_parser.add_argument( + "--reg-addr", + help="The address in which to listen for registration for events.", + default=server.REG_ADDR) + + # client options + client_parser = subparsers.add_parser( + "client", help="Run an events client.") + client_parser.add_argument( + "--emit-addr", + help="The address in which to emit events.", + default=server.EMIT_ADDR) + client_parser.add_argument( + "--reg-addr", + help="The address in which to register for events.", + default=server.REG_ADDR) + group = client_parser.add_mutually_exclusive_group(required=True) + group.add_argument('--reg', help="register an event") + group.add_argument('--emit', help="send an event") + client_parser.add_argument( + '--content', help="the content of the event", default=None) + + # txclient options + txclient_parser = subparsers.add_parser( + "txclient", help="Run an events twisted client.") + txclient_parser.add_argument( + "--emit-addr", + help="The address in which to emit events.", + default=server.EMIT_ADDR) + txclient_parser.add_argument( + "--reg-addr", + help="The address in which to register for events.", + default=server.REG_ADDR) + group = txclient_parser.add_mutually_exclusive_group(required=True) + group.add_argument('--reg', help="register an event") + group.add_argument('--emit', help="send an event") + txclient_parser.add_argument( + '--content', help="the content of the event", default=None) + + return parser.parse_args() + + args = _parse_args() + + if args.debug: + logging.basicConfig(level=logging.DEBUG) + + if args.command == "server": + # run server + server.ensure_server(emit_addr=args.emit_addr, reg_addr=args.reg_addr) + from twisted.internet import reactor + reactor.run() + elif args.command == "client": + if args.reg: + event = getattr(catalog, args.reg) + # run client and register to a signal + register(event, _echo) + # make sure we stop on CTRL+C + import signal + signal.signal( + signal.SIGINT, lambda sig, frame: client.shutdown()) + # wait until client thread dies + import time + while client.EventsClientThread.instance().is_alive(): + time.sleep(0.1) + if args.emit: + # run client and emit a signal + event = getattr(catalog, args.emit) + emit(event, args.content) + client.shutdown() + elif args.command == "txclient": + from leap.common.events import txclient + register = txclient.register + emit = txclient.emit + if args.reg: + event = getattr(catalog, args.reg) + # run client and register to a signal + register(event, _echo) + from twisted.internet import reactor + reactor.run() + if args.emit: + # run client and emit a signal + event = getattr(catalog, args.emit) + emit(event, args.content) diff --git a/src/leap/common/events/catalog.py b/src/leap/common/events/catalog.py new file mode 100644 index 0000000..8bddd2c --- /dev/null +++ b/src/leap/common/events/catalog.py @@ -0,0 +1,88 @@ +# -*- coding: utf-8 -*- +# catalog.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful", +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Events catalog. +""" + + +EVENTS = [ + "CLIENT_SESSION_ID", + "CLIENT_UID", + "IMAP_CLIENT_LOGIN", + "IMAP_SERVICE_FAILED_TO_START", + "IMAP_SERVICE_STARTED", + "IMAP_UNHANDLED_ERROR", + "KEYMANAGER_DONE_UPLOADING_KEYS", + "KEYMANAGER_FINISHED_KEY_GENERATION", + "KEYMANAGER_KEY_FOUND", + "KEYMANAGER_KEY_NOT_FOUND", + "KEYMANAGER_LOOKING_FOR_KEY", + "KEYMANAGER_STARTED_KEY_GENERATION", + "MAIL_FETCHED_INCOMING", + "MAIL_MSG_DECRYPTED", + "MAIL_MSG_DELETED_INCOMING", + "MAIL_MSG_PROCESSING", + "MAIL_MSG_SAVED_LOCALLY", + "MAIL_UNREAD_MESSAGES", + "RAISE_WINDOW", + "SMTP_CONNECTION_LOST", + "SMTP_END_ENCRYPT_AND_SIGN", + "SMTP_END_SIGN", + "SMTP_RECIPIENT_ACCEPTED_ENCRYPTED", + "SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED", + "SMTP_RECIPIENT_REJECTED", + "SMTP_SEND_MESSAGE_ERROR", + "SMTP_SEND_MESSAGE_START", + "SMTP_SEND_MESSAGE_SUCCESS", + "SMTP_SERVICE_FAILED_TO_START", + "SMTP_SERVICE_STARTED", + "SMTP_START_ENCRYPT_AND_SIGN", + "SMTP_START_SIGN", + "SOLEDAD_CREATING_KEYS", + "SOLEDAD_DONE_CREATING_KEYS", + "SOLEDAD_DONE_DATA_SYNC", + "SOLEDAD_DONE_DOWNLOADING_KEYS", + "SOLEDAD_DONE_UPLOADING_KEYS", + "SOLEDAD_DOWNLOADING_KEYS", + "SOLEDAD_INVALID_AUTH_TOKEN", + "SOLEDAD_NEW_DATA_TO_SYNC", + "SOLEDAD_SYNC_RECEIVE_STATUS", + "SOLEDAD_SYNC_SEND_STATUS", + "SOLEDAD_UPLOADING_KEYS", + "UPDATER_DONE_UPDATING", + "UPDATER_NEW_UPDATES", +] + + +class Event(object): + + def __init__(self, label): + self.label = label + + def __repr__(self): + return '<Event: %s>' % self.label + + def __str__(self): + return self.label + + +# create local variables based on the event list above +lcl = locals() +for event in EVENTS: + lcl[event] = Event(event) diff --git a/src/leap/common/events/client.py b/src/leap/common/events/client.py index 83f18e0..6b234a1 100644 --- a/src/leap/common/events/client.py +++ b/src/leap/common/events/client.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # client.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2013, 2014, 2015 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -14,6 +14,8 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. + + """ The client end point of the events mechanism. @@ -21,309 +23,501 @@ Clients are the communicating parties of the events mechanism. They communicate by sending messages to a server, which in turn redistributes messages to other clients. -When a client registers a callback for a given signal, it also tells the -server that it wants to be notified whenever signals of that type are sent by +When a client registers a callback for a given event, it also tells the +server that it wants to be notified whenever events of that type are sent by some other client. """ -import logging +import logging +import collections +import uuid +import threading +import time +import pickle +import os + +from abc import ABCMeta +from abc import abstractmethod + +import zmq +from zmq.eventloop import zmqstream +from zmq.eventloop import ioloop + +# XXX some distros don't package libsodium, so we have to be prepared for +# absence of zmq.auth +try: + import zmq.auth +except ImportError: + pass -from protobuf.socketrpc import RpcService +from leap.common.config import get_path_prefix +from leap.common.zmq_utils import zmq_has_curve +from leap.common.zmq_utils import maybe_create_and_get_certificates +from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX -from leap.common.events import events_pb2 as proto -from leap.common.events import server -from leap.common.events import daemon -from leap.common.events import mac_auth +from leap.common.events.errors import CallbackAlreadyRegisteredError +from leap.common.events.server import EMIT_ADDR +from leap.common.events.server import REG_ADDR +from leap.common.events import catalog logger = logging.getLogger(__name__) -# the `registered_callbacks` dictionary below should have the following -# format: -# -# { event_signal: [ (uid, callback), ... ], ... } -# -registered_callbacks = {} +_emit_addr = EMIT_ADDR +_reg_addr = REG_ADDR + +def configure_client(emit_addr, reg_addr): + global _emit_addr, _reg_addr + logger.debug("Configuring client with addresses: (%s, %s)" % + (emit_addr, reg_addr)) + _emit_addr = emit_addr + _reg_addr = reg_addr -class CallbackAlreadyRegistered(Exception): + +class EventsClient(object): """ - Raised when trying to register an already registered callback. + A singleton client for the events mechanism. """ - pass + __metaclass__ = ABCMeta -def ensure_client_daemon(): - """ - Ensure the client daemon is running and listening for incoming - messages. + _instance = None + _instance_lock = threading.Lock() - :return: the daemon instance - :rtype: EventsClientDaemon - """ - import time - daemon = EventsClientDaemon.ensure(0) - logger.debug('ensure client daemon') + def __init__(self, emit_addr, reg_addr): + """ + Initialize the events client. + """ + logger.debug("Creating client instance.") + self._callbacks = collections.defaultdict(dict) + self._emit_addr = emit_addr + self._reg_addr = reg_addr + + @property + def callbacks(self): + return self._callbacks - # Because we use a random port we want to wait until a port is assigned to - # local client daemon. + @classmethod + def instance(cls): + """ + Return a singleton EventsClient instance. + """ + with cls._instance_lock: + if cls._instance is None: + cls._instance = cls(_emit_addr, _reg_addr) + return cls._instance - while not (EventsClientDaemon.get_instance() and - EventsClientDaemon.get_instance().get_port()): - time.sleep(0.1) - return daemon + def register(self, event, callback, uid=None, replace=False): + """ + Register a callback to be executed when an event is received. + + :param event: The event that triggers the callback. + :type event: Event + :param callback: The callback to be executed. + :type callback: callable(event, *content) + :param uid: The callback uid. + :type uid: str + :param replace: Wether an eventual callback with same ID should be + replaced. + :type replace: bool + + :return: The callback uid. + :rtype: str + + :raises CallbackAlreadyRegisteredError: when there's already a callback + identified by the given uid and replace is False. + """ + logger.debug("Subscribing to event: %s" % event) + if not uid: + uid = uuid.uuid4() + elif uid in self._callbacks[event] and not replace: + raise CallbackAlreadyRegisteredError() + self._callbacks[event][uid] = callback + self._subscribe(str(event)) + return uid + + def unregister(self, event, uid=None): + """ + Unregister callbacks for an event. + If uid is not None, then only the callback identified by the given uid + is removed. Otherwise, all callbacks for the event are removed. -def register(signal, callback, uid=None, replace=False, reqcbk=None, - timeout=1000): - """ - Registers a callback to be called when a specific signal event is - received. - - Will timeout after timeout ms if response has not been received. The - timeout arg is only used for asynch requests. If a reqcbk callback has - been supplied the timeout arg is not used. The response value will be - returned for a synch request but nothing will be returned for an asynch - request. - - :param signal: the signal that causes the callback to be launched - :type signal: int (see the `events.proto` file) - :param callback: the callback to be called when the signal is received - :type callback: function(leap.common.events.events_pb2.SignalRequest) - :param uid: a unique id for the callback - :type uid: int - :param replace: should an existent callback with same uid be replaced? - :type replace: bool - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(proto.RegisterRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - Might raise a CallbackAlreadyRegistered exception if there's already a - callback identified by the given uid and replace is False. - - :return: the response from server for synch calls or nothing for asynch - calls. - :rtype: leap.common.events.events_pb2.EventsResponse or None - """ - ensure_client_daemon() # so we can receive registered signals - # register callback locally - if signal not in registered_callbacks: - registered_callbacks[signal] = [] - cbklist = registered_callbacks[signal] - - # TODO should check that the callback has the right - # number of arguments. - - if uid and filter(lambda (x, y): x == uid, cbklist): - if not replace: - raise CallbackAlreadyRegistered() + :param event: The event that triggers the callback. + :type event: Event + :param uid: The callback uid. + :type uid: str + """ + if not uid: + logger.debug( + "Unregistering all callbacks from event %s." % event) + self._callbacks[event] = {} else: - registered_callbacks[signal] = filter(lambda(x, y): x != uid, - cbklist) - registered_callbacks[signal].append((uid, callback)) - # register callback on server - request = proto.RegisterRequest() - request.event = signal - request.port = EventsClientDaemon.get_instance().get_port() - request.mac_method = mac_auth.MacMethod.MAC_NONE - request.mac = "" - service = RpcService(proto.EventsServerService_Stub, - server.SERVER_PORT, 'localhost') - logger.debug( - "Sending registration request to server on port %s: %s", - server.SERVER_PORT, - str(request)[:40]) - return service.register(request, callback=reqcbk, timeout=timeout) - - -def unregister(signal, uid=None, reqcbk=None, timeout=1000): - """ - Unregister a callback. - - If C{uid} is specified, unregisters only the callback identified by that - unique id. Otherwise, unregisters all callbacks - - :param signal: the signal that causes the callback to be launched - :type signal: int (see the `events.proto` file) - :param uid: a unique id for the callback - :type uid: int - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(proto.UnregisterRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - :return: the response from server for synch calls or nothing for asynch - calls or None if no callback is registered for that signal or - uid. - :rtype: leap.common.events.events_pb2.EventsResponse or None - """ - if signal not in registered_callbacks or not registered_callbacks[signal]: - logger.warning("No callback registered for signal %d." % signal) - return None - # unregister callback locally - cbklist = registered_callbacks[signal] - if uid is not None: - if filter(lambda (cbkuid, _): cbkuid == uid, cbklist) == []: - logger.warning("No callback registered for uid %d." % st) - return None - registered_callbacks[signal] = filter(lambda(x, y): x != uid, cbklist) - else: - # exclude all callbacks for given signal - registered_callbacks[signal] = [] - # unregister port in server if there are no more callbacks for this signal - if not registered_callbacks[signal]: - request = proto.UnregisterRequest() - request.event = signal - request.port = EventsClientDaemon.get_instance().get_port() - request.mac_method = mac_auth.MacMethod.MAC_NONE - request.mac = "" - service = RpcService(proto.EventsServerService_Stub, - server.SERVER_PORT, 'localhost') - logger.info( - "Sending unregistration request to server on port %s: %s", - server.SERVER_PORT, - str(request)[:40]) - return service.unregister(request, callback=reqcbk, timeout=timeout) - - -def signal(signal, content="", mac_method="", mac="", reqcbk=None, - timeout=1000): - """ - Send `signal` event to events server. - - Will timeout after timeout ms if response has not been received. The - timeout arg is only used for asynch requests. If a reqcbk callback has - been supplied the timeout arg is not used. The response value will be - returned for a synch request but nothing will be returned for an asynch - request. - - :param signal: the signal that causes the callback to be launched - :type signal: int (see the `events.proto` file) - :param content: the contents of the event signal - :type content: str - :param mac_method: the method used for auth mac - :type mac_method: str - :param mac: the content of the auth mac - :type mac: str - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(proto.SignalRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - :return: the response from server for synch calls or nothing for asynch - calls. - :rtype: leap.common.events.events_pb2.EventsResponse or None - """ - request = proto.SignalRequest() - request.event = signal - request.content = content - request.mac_method = mac_method - request.mac = mac - service = RpcService(proto.EventsServerService_Stub, server.SERVER_PORT, - 'localhost') - logger.debug("Sending signal to server: %s", str(request)[:40]) - return service.signal(request, callback=reqcbk, timeout=timeout) - - -def ping(port, reqcbk=None, timeout=1000): + logger.debug( + "Unregistering callback %s from event %s." % (uid, event)) + if uid in self._callbacks[event]: + del self._callbacks[event][uid] + if not self._callbacks[event]: + del self._callbacks[event] + self._unsubscribe(str(event)) + + def emit(self, event, *content): + """ + Send an event. + + :param event: The event to be sent. + :type event: Event + :param content: The content of the event. + :type content: list + """ + logger.debug("Sending event: (%s, %s)" % (event, content)) + self._send(str(event) + b'\0' + pickle.dumps(content)) + + def _handle_event(self, event, content): + """ + Handle an incoming event. + + :param msg: The incoming message. + :type msg: list(str) + """ + logger.debug("Handling event %s..." % event) + for uid in self._callbacks[event]: + callback = self._callbacks[event][uid] + logger.debug("Executing callback %s." % uid) + callback(event, *content) + + @abstractmethod + def _subscribe(self, tag): + """ + Subscribe to a tag on the zmq SUB socket. + + :param tag: The tag to be subscribed. + :type tag: str + """ + pass + + @abstractmethod + def _unsubscribe(self, tag): + """ + Unsubscribe from a tag on the zmq SUB socket. + + :param tag: The tag to be unsubscribed. + :type tag: str + """ + pass + + @abstractmethod + def _send(self, data): + """ + Send data through PUSH socket. + + :param data: The data to be sent. + :type event: str + """ + pass + + def shutdown(self): + self.__class__.reset() + + @classmethod + def reset(cls): + with cls._instance_lock: + cls._instance = None + + +class EventsIOLoop(ioloop.ZMQIOLoop): """ - Ping a client running in C{port}. - - :param port: the port in which the client should be listening - :type port: int - :param reqcbk: a callback to be called when a response from client is - received - :type reqcbk: function(proto.PingRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - :return: the response from client for synch calls or nothing for asynch - calls. - :rtype: leap.common.events.events_pb2.EventsResponse or None + An extension of zmq's ioloop that can wait until there are no callbacks + in the queue before stopping. """ - request = proto.PingRequest() - service = RpcService( - proto.EventsClientService_Stub, - port, - 'localhost') - logger.debug("Pinging a client in port %d..." % port) - return service.ping(request, callback=reqcbk, timeout=timeout) + def stop(self, wait=False): + """ + Stop the I/O loop. -class EventsClientService(proto.EventsClientService): + :param wait: Whether we should wait for callbacks in queue to finish + before stopping. + :type wait: bool + """ + if wait: + # prevent new callbacks from being added + with self._callback_lock: + self._closing = True + # wait until all callbacks have been executed + while self._callbacks: + time.sleep(0.1) + ioloop.ZMQIOLoop.stop(self) + + +class EventsClientThread(threading.Thread, EventsClient): """ - Service for receiving signal events in clients. + A threaded version of the events client. """ - def __init__(self): - proto.EventsClientService.__init__(self) + def __init__(self, emit_addr, reg_addr): + """ + Initialize the events client. + """ + threading.Thread.__init__(self) + EventsClient.__init__(self, emit_addr, reg_addr) + self._lock = threading.Lock() + self._initialized = threading.Event() + self._config_prefix = os.path.join( + get_path_prefix(), "leap", "events") + self._loop = None + self._context = None + self._push = None + self._sub = None + + def _init_zmq(self): + """ + Initialize ZMQ connections. + """ + self._loop = EventsIOLoop() + self._context = zmq.Context() + # connect SUB first, otherwise we might miss some event sent from this + # same client + self._sub = self._zmq_connect_sub() + self._push = self._zmq_connect_push() + + def _zmq_connect(self, socktype, address): + """ + Connect to an address using with a zmq socktype. + + :param socktype: The ZMQ socket type. + :type socktype: int + :param address: The address to connect to. + :type address: str - def signal(self, controller, request, done): + :return: A ZMQ connection stream. + :rtype: ZMQStream """ - Receive a signal and run callbacks registered for that signal. + logger.debug("Connecting %s to %s." % (socktype, address)) + socket = self._context.socket(socktype) + # configure curve authentication + if zmq_has_curve(): + public, private = maybe_create_and_get_certificates( + self._config_prefix, "client") + server_public_file = os.path.join( + self._config_prefix, PUBLIC_KEYS_PREFIX, "server.key") + server_public, _ = zmq.auth.load_certificate(server_public_file) + socket.curve_publickey = public + socket.curve_secretkey = private + socket.curve_serverkey = server_public + stream = zmqstream.ZMQStream(socket, self._loop) + socket.connect(address) + return stream + + def _zmq_connect_push(self): + """ + Initialize the client's PUSH connection. - This method is called whenever a signal request is received from - server. + :return: A ZMQ connection stream. + :rtype: ZMQStream + """ + return self._zmq_connect(zmq.PUSH, self._emit_addr) - :param controller: used to mediate a single method call - :type controller: protobuf.socketrpc.controller.SocketRpcController - :param request: the request received from the client - :type request: leap.common.events.events_pb2.SignalRequest - :param done: callback to be called when done - :type done: protobuf.socketrpc.server.Callback + def _zmq_connect_sub(self): """ - logger.debug('Received signal from server: %s...' % str(request)[:40]) + Initialize the client's SUB connection. - # run registered callbacks - # TODO: verify authentication using mac in incoming message - if request.event in registered_callbacks: - for (_, cbk) in registered_callbacks[request.event]: - # callbacks should be prepared to receive a - # events_pb2.SignalRequest. - cbk(request) + :return: A ZMQ connection stream. + :rtype: ZMQStream + """ + stream = self._zmq_connect(zmq.SUB, self._reg_addr) + stream.on_recv(self._on_recv) + return stream - # send response back to server - response = proto.EventResponse() - response.status = proto.EventResponse.OK - done.run(response) + def _on_recv(self, msg): + """ + Handle an incoming message in the SUB socket. - def ping(self, controller, request, done): + :param msg: The received message. + :type msg: str """ - Reply to a ping request. + ev_str, content_pickle = msg[0].split(b'\0', 1) # undo txzmq tagging + event = getattr(catalog, ev_str) + content = pickle.loads(content_pickle) + self._handle_event(event, content) - :param controller: used to mediate a single method call - :type controller: protobuf.socketrpc.controller.SocketRpcController - :param request: the request received from the client - :type request: leap.common.events.events_pb2.RegisterRequest - :param done: callback to be called when done - :type done: protobuf.socketrpc.server.Callback + def _subscribe(self, tag): """ - logger.debug("Received ping request, sending response.") - response = proto.EventResponse() - response.status = proto.EventResponse.OK - done.run(response) + Subscribe from a tag on the zmq SUB socket. + :param tag: The tag to be subscribed. + :type tag: str + """ + self._sub.socket.setsockopt(zmq.SUBSCRIBE, tag) -class EventsClientDaemon(daemon.EventsSingletonDaemon): - """ - A daemon that listens for incoming events from server. - """ - @classmethod - def ensure(cls, port): + def _unsubscribe(self, tag): + """ + Unsubscribe from a tag on the zmq SUB socket. + + :param tag: The tag to be unsubscribed. + :type tag: str + """ + self._sub.socket.setsockopt(zmq.UNSUBSCRIBE, tag) + + def _send(self, data): + """ + Send data through PUSH socket. + + :param data: The data to be sent. + :type event: str + """ + logger.debug("Sending data: %s" % data) + # add send() as a callback for ioloop so it works between threads + self._loop.add_callback(lambda: self._push.send(data)) + + def register(self, event, callback, uid=None, replace=False): + """ + Register a callback to be executed when an event is received. + + :param event: The event that triggers the callback. + :type event: Event + :param callback: The callback to be executed. + :type callback: callable(event, *content) + :param uid: The callback uid. + :type uid: str + :param replace: Wether an eventual callback with same ID should be + replaced. + :type replace: bool + + :return: The callback uid. + :rtype: str + + :raises CallbackAlreadyRegisteredError: when there's already a + callback identified by the given uid and replace is False. """ - Make sure the daemon is running on the given port. + self.ensure_client() + return EventsClient.register(self, event, callback, uid=uid, replace=replace) - :param port: the port in which the daemon should listen - :type port: int + def unregister(self, event, uid=None): + """ + Unregister callbacks for an event. + + If uid is not None, then only the callback identified by the given uid + is removed. Otherwise, all callbacks for the event are removed. + + :param event: The event that triggers the callback. + :type event: Event + :param uid: The callback uid. + :type uid: str + """ + self.ensure_client() + EventsClient.unregister(self, event, uid=uid) + + def emit(self, event, *content): + """ + Send an event. + + :param event: The event to be sent. + :type event: Event + :param content: The content of the event. + :type content: list + """ + self.ensure_client() + EventsClient.emit(self, event, *content) + + def run(self): + """ + Run the events client. + """ + logger.debug("Starting ioloop.") + self._init_zmq() + self._initialized.set() + self._loop.start() + self._loop.close() + logger.debug("Ioloop finished.") + + def ensure_client(self): + """ + Make sure the events client thread is started. + """ + with self._lock: + if not self.is_alive(): + self.daemon = True + self.start() + self._initialized.wait() - :return: a daemon instance - :rtype: EventsClientDaemon + def shutdown(self): """ - return cls.ensure_service(port, EventsClientService()) + Shutdown the events client thread. + """ + logger.debug("Shutting down client...") + with self._lock: + if self.is_alive(): + self._loop.stop(wait=True) + EventsClient.shutdown(self) + + +def shutdown(): + """ + Shutdown the events client thread. + """ + EventsClientThread.instance().shutdown() + + +def register(event, callback, uid=None, replace=False): + """ + Register a callback to be executed when an event is received. + + :param event: The event that triggers the callback. + :type event: str + :param callback: The callback to be executed. + :type callback: callable(event, content) + :param uid: The callback uid. + :type uid: str + :param replace: Wether an eventual callback with same ID should be + replaced. + :type replace: bool + + :return: The callback uid. + :rtype: str + + :raises CallbackAlreadyRegisteredError: when there's already a callback + identified by the given uid and replace is False. + """ + return EventsClientThread.instance().register( + event, callback, uid=uid, replace=replace) + + +def unregister(event, uid=None): + """ + Unregister callbacks for an event. + + If uid is not None, then only the callback identified by the given uid is + removed. Otherwise, all callbacks for the event are removed. + + :param event: The event that triggers the callback. + :type event: str + :param uid: The callback uid. + :type uid: str + """ + return EventsClientThread.instance().unregister(event, uid=uid) + + +def emit(event, *content): + """ + Send an event. + + :param event: The event to be sent. + :type event: str + :param content: The content of the event. + :type content: list + """ + return EventsClientThread.instance().emit(event, *content) + + +def instance(): + """ + Return an instance of the events client. + + :return: An instance of the events client. + :rtype: EventsClientThread + """ + return EventsClientThread.instance() diff --git a/src/leap/common/events/daemon.py b/src/leap/common/events/daemon.py deleted file mode 100644 index c4a4189..0000000 --- a/src/leap/common/events/daemon.py +++ /dev/null @@ -1,208 +0,0 @@ -# -*- coding: utf-8 -*- -# daemon.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -""" -A singleton daemon for running RPC services using protobuf.socketrpc. -""" - - -import logging -import threading - - -from protobuf.socketrpc.server import ( - SocketRpcServer, - ThreadedTCPServer, - SocketHandler, -) - - -logger = logging.getLogger(__name__) - - -class ServiceAlreadyRunningException(Exception): - """ - Raised whenever a service is already running in this process but someone - attemped to start it in a different port. - """ - - -class EventsRpcServer(SocketRpcServer): - """ - RPC server used in server and client interfaces to receive messages. - """ - - def __init__(self, port, host='localhost'): - """ - Initialize a RPC server. - - :param port: the port in which to listen for incoming messages - :type port: int - :param host: the address to bind to - :type host: str - """ - SocketRpcServer.__init__(self, port, host) - self._server = None - - def run(self): - """ - Run the server. - """ - logger.info('Running server on port %d.' % self.port) - # parent implementation does not hold the server instance, so we do it - # here. - self._server = ThreadedTCPServer((self.host, self.port), - SocketHandler, self) - # if we chose to use a random port, fetch the port number info. - if self.port is 0: - self.port = self._server.socket.getsockname()[1] - self._server.serve_forever() - - def stop(self): - """ - Stop the server. - """ - self._server.shutdown() - - -class EventsSingletonDaemon(threading.Thread): - """ - Singleton class for for launching and terminating a daemon. - - This class is used so every part of the mechanism that needs to listen for - messages can launch its own daemon (thread) to do the job. - """ - - # Singleton instance - __instance = None - - def __new__(cls, *args, **kwargs): - """ - Return a singleton instance if it exists or create and initialize one. - """ - if len(args) is not 2: - raise TypeError("__init__() takes exactly 2 arguments (%d given)" - % len(args)) - if cls.__instance is None: - cls.__instance = object.__new__( - EventsSingletonDaemon) - cls.__initialize(cls.__instance, args[0], args[1]) - return cls.__instance - - @staticmethod - def __initialize(self, port, service): - """ - Initialize a singleton daemon. - - This is a static method disguised as instance method that actually - does the initialization of the daemon instance. - - :param port: the port in which to listen for incoming messages - :type port: int - :param service: the service to provide in this daemon - :type service: google.protobuf.service.Service - """ - threading.Thread.__init__(self) - self._port = port - self._service = service - self._server = EventsRpcServer(self._port) - self._server.registerService(self._service) - self.daemon = True - - def __init__(self): - """ - Singleton placeholder initialization method. - - Initialization is made in __new__ so we can always return the same - instance upon object creation. - """ - pass - - @classmethod - def ensure(cls, port): - """ - Make sure the daemon instance is running. - - Each implementation of this method should call `self.ensure_service` - with the appropriate service from the `events.proto` definitions, and - return the daemon instance. - - :param port: the port in which the daemon should be listening - :type port: int - - :return: a daemon instance - :rtype: EventsSingletonDaemon - """ - raise NotImplementedError(self.ensure) - - @classmethod - def ensure_service(cls, port, service): - """ - Start the singleton instance if not already running. - - Might return ServiceAlreadyRunningException - - :param port: the port in which the daemon should be listening - :type port: int - - :return: a daemon instance - :rtype: EventsSingletonDaemon - """ - daemon = cls(port, service) - if not daemon.is_alive(): - daemon.start() - elif port and port != cls.__instance._port: - # service is running in this process but someone is trying to - # start it in another port - raise ServiceAlreadyRunningException( - "Service is already running in this process on port %d." - % self.__instance._port) - return daemon - - @classmethod - def get_instance(cls): - """ - Retrieve singleton instance of this daemon. - - :return: a daemon instance - :rtype: EventsSingletonDaemon - """ - return cls.__instance - - def run(self): - """ - Run the server. - """ - self._server.run() - - def stop(self): - """ - Stop the daemon. - """ - self._server.stop() - - def get_port(self): - """ - Retrieve the value of the port to which the service running in this - daemon is binded to. - - :return: the port to which the daemon is binded to - :rtype: int - """ - if self._port is 0: - self._port = self._server.port - return self._port diff --git a/src/leap/common/events/mac_auth.py b/src/leap/common/events/errors.py index 49d48f7..58e0014 100644 --- a/src/leap/common/events/mac_auth.py +++ b/src/leap/common/events/errors.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# mac_auth.py -# Copyright (C) 2013 LEAP +# errors.py +# Copyright (C) 2015 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -15,17 +15,9 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -""" -Authentication system for events. -This is not implemented yet. -""" - - -class MacMethod(object): +class CallbackAlreadyRegisteredError(Exception): """ - Representation of possible MAC authentication methods. + Raised when trying to register an already registered callback. """ - - MAC_NONE = 'none' - MAC_HMAC = 'hmac' + pass diff --git a/src/leap/common/events/events.proto b/src/leap/common/events/events.proto deleted file mode 100644 index 2371b2a..0000000 --- a/src/leap/common/events/events.proto +++ /dev/null @@ -1,145 +0,0 @@ -// signal.proto -// Copyright (C) 2013 LEAP -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see <http://www.gnu.org/licenses/>. - -package leap.common.events; -option py_generic_services = true; - - -// These are the events that can be signaled using the events mechanism. - -enum Event { - CLIENT_SESSION_ID = 1; - CLIENT_UID = 2; - SOLEDAD_CREATING_KEYS = 3; - SOLEDAD_DONE_CREATING_KEYS = 4; - SOLEDAD_UPLOADING_KEYS = 5; - SOLEDAD_DONE_UPLOADING_KEYS = 6; - SOLEDAD_DOWNLOADING_KEYS = 7; - SOLEDAD_DONE_DOWNLOADING_KEYS = 8; - SOLEDAD_NEW_DATA_TO_SYNC = 9; - SOLEDAD_DONE_DATA_SYNC = 10; - UPDATER_NEW_UPDATES = 11; - UPDATER_DONE_UPDATING = 12; - RAISE_WINDOW = 13; - SMTP_SERVICE_STARTED = 14; - SMTP_SERVICE_FAILED_TO_START = 15; - SMTP_RECIPIENT_ACCEPTED_ENCRYPTED = 16; - SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED = 17; - SMTP_RECIPIENT_REJECTED = 18; - SMTP_START_ENCRYPT_AND_SIGN = 19; - SMTP_END_ENCRYPT_AND_SIGN = 20; - SMTP_START_SIGN = 21; - SMTP_END_SIGN = 22; - SMTP_SEND_MESSAGE_START = 23; - SMTP_SEND_MESSAGE_SUCCESS = 24; - SMTP_SEND_MESSAGE_ERROR = 25; - SMTP_CONNECTION_LOST = 26; - IMAP_SERVICE_STARTED = 30; - IMAP_SERVICE_FAILED_TO_START = 31; - IMAP_CLIENT_LOGIN = 32; - IMAP_FETCHED_INCOMING = 33; - IMAP_MSG_PROCESSING = 34; - IMAP_MSG_DECRYPTED = 35; - IMAP_MSG_SAVED_LOCALLY = 36; - IMAP_MSG_DELETED_INCOMING = 37; - IMAP_UNHANDLED_ERROR = 38; - IMAP_UNREAD_MAIL = 39; - KEYMANAGER_LOOKING_FOR_KEY = 40; - KEYMANAGER_KEY_FOUND = 41; - KEYMANAGER_KEY_NOT_FOUND = 42; - KEYMANAGER_STARTED_KEY_GENERATION = 43; - KEYMANAGER_FINISHED_KEY_GENERATION = 44; - KEYMANAGER_DONE_UPLOADING_KEYS = 45; - SOLEDAD_INVALID_AUTH_TOKEN = 46; - SOLEDAD_SYNC_SEND_STATUS = 47; - SOLEDAD_SYNC_RECEIVE_STATUS = 48; -} - - -// A SignalRequest is the type of the message sent from one component to request -// that a signal be sent to every registered component. - -message SignalRequest { - required Event event = 1; - required string content = 2; - required string mac_method = 3; - required bytes mac = 4; - optional string enc_method = 5; - optional bool error_occurred = 6; -} - - -// A RegisterRequest message tells the server that a component wants to -// be signaled whenever a specific event occurs. - -message RegisterRequest { - required Event event = 1; - required int32 port = 2; - required string mac_method = 3; - required bytes mac = 4; -} - - -// An UnregisterRequest message tells the server that a component does not -// want to be signaled when a specific event occurs. - -message UnregisterRequest { - required Event event = 1; - required int32 port = 2; - required string mac_method = 3; - required bytes mac = 4; -} - - -// A PingRequest message is used to find out if a server or component is -// alive. - -message PingRequest { -} - - -// The EventResponse is the message sent back by server and components after -// they receive other kinds of requests. - -message EventResponse { - - enum Status { - OK = 1; - UNAUTH = 2; - ERROR = 3; - } - - required Status status = 1; - optional string result = 2; -} - - -// The EventsServerService is the service provided by the server. - -service EventsServerService { - rpc ping(PingRequest) returns (EventResponse); - rpc register(RegisterRequest) returns (EventResponse); - rpc unregister(UnregisterRequest) returns (EventResponse); - rpc signal(SignalRequest) returns (EventResponse); -} - - -// EventsComponentService is the service provided by components (clients). - -service EventsClientService { - rpc ping(PingRequest) returns (EventResponse); - rpc signal(SignalRequest) returns (EventResponse); -} diff --git a/src/leap/common/events/events_pb2.py b/src/leap/common/events/events_pb2.py deleted file mode 100644 index 9692ea1..0000000 --- a/src/leap/common/events/events_pb2.py +++ /dev/null @@ -1,635 +0,0 @@ -# Generated by the protocol buffer compiler. DO NOT EDIT! - -from google.protobuf import descriptor -from google.protobuf import message -from google.protobuf import reflection -from google.protobuf import service -from google.protobuf import service_reflection -from google.protobuf import descriptor_pb2 -# @@protoc_insertion_point(imports) - - - -DESCRIPTOR = descriptor.FileDescriptor( - name='events.proto', - package='leap.common.events', - serialized_pb='\n\x0c\x65vents.proto\x12\x12leap.common.events\"\x97\x01\n\rSignalRequest\x12(\n\x05\x65vent\x18\x01 \x02(\x0e\x32\x19.leap.common.events.Event\x12\x0f\n\x07\x63ontent\x18\x02 \x02(\t\x12\x12\n\nmac_method\x18\x03 \x02(\t\x12\x0b\n\x03mac\x18\x04 \x02(\x0c\x12\x12\n\nenc_method\x18\x05 \x01(\t\x12\x16\n\x0e\x65rror_occurred\x18\x06 \x01(\x08\"j\n\x0fRegisterRequest\x12(\n\x05\x65vent\x18\x01 \x02(\x0e\x32\x19.leap.common.events.Event\x12\x0c\n\x04port\x18\x02 \x02(\x05\x12\x12\n\nmac_method\x18\x03 \x02(\t\x12\x0b\n\x03mac\x18\x04 \x02(\x0c\"l\n\x11UnregisterRequest\x12(\n\x05\x65vent\x18\x01 \x02(\x0e\x32\x19.leap.common.events.Event\x12\x0c\n\x04port\x18\x02 \x02(\x05\x12\x12\n\nmac_method\x18\x03 \x02(\t\x12\x0b\n\x03mac\x18\x04 \x02(\x0c\"\r\n\x0bPingRequest\"\x82\x01\n\rEventResponse\x12\x38\n\x06status\x18\x01 \x02(\x0e\x32(.leap.common.events.EventResponse.Status\x12\x0e\n\x06result\x18\x02 \x01(\t\"\'\n\x06Status\x12\x06\n\x02OK\x10\x01\x12\n\n\x06UNAUTH\x10\x02\x12\t\n\x05\x45RROR\x10\x03*\x9f\n\n\x05\x45vent\x12\x15\n\x11\x43LIENT_SESSION_ID\x10\x01\x12\x0e\n\nCLIENT_UID\x10\x02\x12\x19\n\x15SOLEDAD_CREATING_KEYS\x10\x03\x12\x1e\n\x1aSOLEDAD_DONE_CREATING_KEYS\x10\x04\x12\x1a\n\x16SOLEDAD_UPLOADING_KEYS\x10\x05\x12\x1f\n\x1bSOLEDAD_DONE_UPLOADING_KEYS\x10\x06\x12\x1c\n\x18SOLEDAD_DOWNLOADING_KEYS\x10\x07\x12!\n\x1dSOLEDAD_DONE_DOWNLOADING_KEYS\x10\x08\x12\x1c\n\x18SOLEDAD_NEW_DATA_TO_SYNC\x10\t\x12\x1a\n\x16SOLEDAD_DONE_DATA_SYNC\x10\n\x12\x17\n\x13UPDATER_NEW_UPDATES\x10\x0b\x12\x19\n\x15UPDATER_DONE_UPDATING\x10\x0c\x12\x10\n\x0cRAISE_WINDOW\x10\r\x12\x18\n\x14SMTP_SERVICE_STARTED\x10\x0e\x12 \n\x1cSMTP_SERVICE_FAILED_TO_START\x10\x0f\x12%\n!SMTP_RECIPIENT_ACCEPTED_ENCRYPTED\x10\x10\x12\'\n#SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED\x10\x11\x12\x1b\n\x17SMTP_RECIPIENT_REJECTED\x10\x12\x12\x1f\n\x1bSMTP_START_ENCRYPT_AND_SIGN\x10\x13\x12\x1d\n\x19SMTP_END_ENCRYPT_AND_SIGN\x10\x14\x12\x13\n\x0fSMTP_START_SIGN\x10\x15\x12\x11\n\rSMTP_END_SIGN\x10\x16\x12\x1b\n\x17SMTP_SEND_MESSAGE_START\x10\x17\x12\x1d\n\x19SMTP_SEND_MESSAGE_SUCCESS\x10\x18\x12\x1b\n\x17SMTP_SEND_MESSAGE_ERROR\x10\x19\x12\x18\n\x14SMTP_CONNECTION_LOST\x10\x1a\x12\x18\n\x14IMAP_SERVICE_STARTED\x10\x1e\x12 \n\x1cIMAP_SERVICE_FAILED_TO_START\x10\x1f\x12\x15\n\x11IMAP_CLIENT_LOGIN\x10 \x12\x19\n\x15IMAP_FETCHED_INCOMING\x10!\x12\x17\n\x13IMAP_MSG_PROCESSING\x10\"\x12\x16\n\x12IMAP_MSG_DECRYPTED\x10#\x12\x1a\n\x16IMAP_MSG_SAVED_LOCALLY\x10$\x12\x1d\n\x19IMAP_MSG_DELETED_INCOMING\x10%\x12\x18\n\x14IMAP_UNHANDLED_ERROR\x10&\x12\x14\n\x10IMAP_UNREAD_MAIL\x10\'\x12\x1e\n\x1aKEYMANAGER_LOOKING_FOR_KEY\x10(\x12\x18\n\x14KEYMANAGER_KEY_FOUND\x10)\x12\x1c\n\x18KEYMANAGER_KEY_NOT_FOUND\x10*\x12%\n!KEYMANAGER_STARTED_KEY_GENERATION\x10+\x12&\n\"KEYMANAGER_FINISHED_KEY_GENERATION\x10,\x12\"\n\x1eKEYMANAGER_DONE_UPLOADING_KEYS\x10-\x12\x1e\n\x1aSOLEDAD_INVALID_AUTH_TOKEN\x10.\x12\x1c\n\x18SOLEDAD_SYNC_SEND_STATUS\x10/\x12\x1f\n\x1bSOLEDAD_SYNC_RECEIVE_STATUS\x10\x30\x32\xdd\x02\n\x13\x45ventsServerService\x12J\n\x04ping\x12\x1f.leap.common.events.PingRequest\x1a!.leap.common.events.EventResponse\x12R\n\x08register\x12#.leap.common.events.RegisterRequest\x1a!.leap.common.events.EventResponse\x12V\n\nunregister\x12%.leap.common.events.UnregisterRequest\x1a!.leap.common.events.EventResponse\x12N\n\x06signal\x12!.leap.common.events.SignalRequest\x1a!.leap.common.events.EventResponse2\xb1\x01\n\x13\x45ventsClientService\x12J\n\x04ping\x12\x1f.leap.common.events.PingRequest\x1a!.leap.common.events.EventResponse\x12N\n\x06signal\x12!.leap.common.events.SignalRequest\x1a!.leap.common.events.EventResponseB\x03\x90\x01\x01') - -_EVENT = descriptor.EnumDescriptor( - name='Event', - full_name='leap.common.events.Event', - filename=None, - file=DESCRIPTOR, - values=[ - descriptor.EnumValueDescriptor( - name='CLIENT_SESSION_ID', index=0, number=1, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='CLIENT_UID', index=1, number=2, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_CREATING_KEYS', index=2, number=3, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_DONE_CREATING_KEYS', index=3, number=4, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_UPLOADING_KEYS', index=4, number=5, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_DONE_UPLOADING_KEYS', index=5, number=6, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_DOWNLOADING_KEYS', index=6, number=7, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_DONE_DOWNLOADING_KEYS', index=7, number=8, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_NEW_DATA_TO_SYNC', index=8, number=9, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_DONE_DATA_SYNC', index=9, number=10, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='UPDATER_NEW_UPDATES', index=10, number=11, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='UPDATER_DONE_UPDATING', index=11, number=12, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='RAISE_WINDOW', index=12, number=13, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_SERVICE_STARTED', index=13, number=14, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_SERVICE_FAILED_TO_START', index=14, number=15, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_RECIPIENT_ACCEPTED_ENCRYPTED', index=15, number=16, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED', index=16, number=17, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_RECIPIENT_REJECTED', index=17, number=18, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_START_ENCRYPT_AND_SIGN', index=18, number=19, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_END_ENCRYPT_AND_SIGN', index=19, number=20, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_START_SIGN', index=20, number=21, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_END_SIGN', index=21, number=22, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_SEND_MESSAGE_START', index=22, number=23, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_SEND_MESSAGE_SUCCESS', index=23, number=24, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_SEND_MESSAGE_ERROR', index=24, number=25, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_CONNECTION_LOST', index=25, number=26, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='IMAP_SERVICE_STARTED', index=26, number=30, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='IMAP_SERVICE_FAILED_TO_START', index=27, number=31, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='IMAP_CLIENT_LOGIN', index=28, number=32, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='IMAP_FETCHED_INCOMING', index=29, number=33, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='IMAP_MSG_PROCESSING', index=30, number=34, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='IMAP_MSG_DECRYPTED', index=31, number=35, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='IMAP_MSG_SAVED_LOCALLY', index=32, number=36, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='IMAP_MSG_DELETED_INCOMING', index=33, number=37, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='IMAP_UNHANDLED_ERROR', index=34, number=38, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='IMAP_UNREAD_MAIL', index=35, number=39, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='KEYMANAGER_LOOKING_FOR_KEY', index=36, number=40, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='KEYMANAGER_KEY_FOUND', index=37, number=41, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='KEYMANAGER_KEY_NOT_FOUND', index=38, number=42, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='KEYMANAGER_STARTED_KEY_GENERATION', index=39, number=43, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='KEYMANAGER_FINISHED_KEY_GENERATION', index=40, number=44, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='KEYMANAGER_DONE_UPLOADING_KEYS', index=41, number=45, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_INVALID_AUTH_TOKEN', index=42, number=46, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_SYNC_SEND_STATUS', index=43, number=47, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_SYNC_RECEIVE_STATUS', index=44, number=48, - options=None, - type=None), - ], - containing_type=None, - options=None, - serialized_start=557, - serialized_end=1868, -) - - -CLIENT_SESSION_ID = 1 -CLIENT_UID = 2 -SOLEDAD_CREATING_KEYS = 3 -SOLEDAD_DONE_CREATING_KEYS = 4 -SOLEDAD_UPLOADING_KEYS = 5 -SOLEDAD_DONE_UPLOADING_KEYS = 6 -SOLEDAD_DOWNLOADING_KEYS = 7 -SOLEDAD_DONE_DOWNLOADING_KEYS = 8 -SOLEDAD_NEW_DATA_TO_SYNC = 9 -SOLEDAD_DONE_DATA_SYNC = 10 -UPDATER_NEW_UPDATES = 11 -UPDATER_DONE_UPDATING = 12 -RAISE_WINDOW = 13 -SMTP_SERVICE_STARTED = 14 -SMTP_SERVICE_FAILED_TO_START = 15 -SMTP_RECIPIENT_ACCEPTED_ENCRYPTED = 16 -SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED = 17 -SMTP_RECIPIENT_REJECTED = 18 -SMTP_START_ENCRYPT_AND_SIGN = 19 -SMTP_END_ENCRYPT_AND_SIGN = 20 -SMTP_START_SIGN = 21 -SMTP_END_SIGN = 22 -SMTP_SEND_MESSAGE_START = 23 -SMTP_SEND_MESSAGE_SUCCESS = 24 -SMTP_SEND_MESSAGE_ERROR = 25 -SMTP_CONNECTION_LOST = 26 -IMAP_SERVICE_STARTED = 30 -IMAP_SERVICE_FAILED_TO_START = 31 -IMAP_CLIENT_LOGIN = 32 -IMAP_FETCHED_INCOMING = 33 -IMAP_MSG_PROCESSING = 34 -IMAP_MSG_DECRYPTED = 35 -IMAP_MSG_SAVED_LOCALLY = 36 -IMAP_MSG_DELETED_INCOMING = 37 -IMAP_UNHANDLED_ERROR = 38 -IMAP_UNREAD_MAIL = 39 -KEYMANAGER_LOOKING_FOR_KEY = 40 -KEYMANAGER_KEY_FOUND = 41 -KEYMANAGER_KEY_NOT_FOUND = 42 -KEYMANAGER_STARTED_KEY_GENERATION = 43 -KEYMANAGER_FINISHED_KEY_GENERATION = 44 -KEYMANAGER_DONE_UPLOADING_KEYS = 45 -SOLEDAD_INVALID_AUTH_TOKEN = 46 -SOLEDAD_SYNC_SEND_STATUS = 47 -SOLEDAD_SYNC_RECEIVE_STATUS = 48 - - -_EVENTRESPONSE_STATUS = descriptor.EnumDescriptor( - name='Status', - full_name='leap.common.events.EventResponse.Status', - filename=None, - file=DESCRIPTOR, - values=[ - descriptor.EnumValueDescriptor( - name='OK', index=0, number=1, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='UNAUTH', index=1, number=2, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='ERROR', index=2, number=3, - options=None, - type=None), - ], - containing_type=None, - options=None, - serialized_start=515, - serialized_end=554, -) - - -_SIGNALREQUEST = descriptor.Descriptor( - name='SignalRequest', - full_name='leap.common.events.SignalRequest', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - descriptor.FieldDescriptor( - name='event', full_name='leap.common.events.SignalRequest.event', index=0, - number=1, type=14, cpp_type=8, label=2, - has_default_value=False, default_value=1, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='content', full_name='leap.common.events.SignalRequest.content', index=1, - number=2, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='mac_method', full_name='leap.common.events.SignalRequest.mac_method', index=2, - number=3, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='mac', full_name='leap.common.events.SignalRequest.mac', index=3, - number=4, type=12, cpp_type=9, label=2, - has_default_value=False, default_value="", - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='enc_method', full_name='leap.common.events.SignalRequest.enc_method', index=4, - number=5, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='error_occurred', full_name='leap.common.events.SignalRequest.error_occurred', index=5, - number=6, type=8, cpp_type=7, label=1, - has_default_value=False, default_value=False, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=37, - serialized_end=188, -) - - -_REGISTERREQUEST = descriptor.Descriptor( - name='RegisterRequest', - full_name='leap.common.events.RegisterRequest', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - descriptor.FieldDescriptor( - name='event', full_name='leap.common.events.RegisterRequest.event', index=0, - number=1, type=14, cpp_type=8, label=2, - has_default_value=False, default_value=1, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='port', full_name='leap.common.events.RegisterRequest.port', index=1, - number=2, type=5, cpp_type=1, label=2, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='mac_method', full_name='leap.common.events.RegisterRequest.mac_method', index=2, - number=3, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='mac', full_name='leap.common.events.RegisterRequest.mac', index=3, - number=4, type=12, cpp_type=9, label=2, - has_default_value=False, default_value="", - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=190, - serialized_end=296, -) - - -_UNREGISTERREQUEST = descriptor.Descriptor( - name='UnregisterRequest', - full_name='leap.common.events.UnregisterRequest', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - descriptor.FieldDescriptor( - name='event', full_name='leap.common.events.UnregisterRequest.event', index=0, - number=1, type=14, cpp_type=8, label=2, - has_default_value=False, default_value=1, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='port', full_name='leap.common.events.UnregisterRequest.port', index=1, - number=2, type=5, cpp_type=1, label=2, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='mac_method', full_name='leap.common.events.UnregisterRequest.mac_method', index=2, - number=3, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='mac', full_name='leap.common.events.UnregisterRequest.mac', index=3, - number=4, type=12, cpp_type=9, label=2, - has_default_value=False, default_value="", - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=298, - serialized_end=406, -) - - -_PINGREQUEST = descriptor.Descriptor( - name='PingRequest', - full_name='leap.common.events.PingRequest', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=408, - serialized_end=421, -) - - -_EVENTRESPONSE = descriptor.Descriptor( - name='EventResponse', - full_name='leap.common.events.EventResponse', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - descriptor.FieldDescriptor( - name='status', full_name='leap.common.events.EventResponse.status', index=0, - number=1, type=14, cpp_type=8, label=2, - has_default_value=False, default_value=1, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='result', full_name='leap.common.events.EventResponse.result', index=1, - number=2, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - _EVENTRESPONSE_STATUS, - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=424, - serialized_end=554, -) - -_SIGNALREQUEST.fields_by_name['event'].enum_type = _EVENT -_REGISTERREQUEST.fields_by_name['event'].enum_type = _EVENT -_UNREGISTERREQUEST.fields_by_name['event'].enum_type = _EVENT -_EVENTRESPONSE.fields_by_name['status'].enum_type = _EVENTRESPONSE_STATUS -_EVENTRESPONSE_STATUS.containing_type = _EVENTRESPONSE; -DESCRIPTOR.message_types_by_name['SignalRequest'] = _SIGNALREQUEST -DESCRIPTOR.message_types_by_name['RegisterRequest'] = _REGISTERREQUEST -DESCRIPTOR.message_types_by_name['UnregisterRequest'] = _UNREGISTERREQUEST -DESCRIPTOR.message_types_by_name['PingRequest'] = _PINGREQUEST -DESCRIPTOR.message_types_by_name['EventResponse'] = _EVENTRESPONSE - -class SignalRequest(message.Message): - __metaclass__ = reflection.GeneratedProtocolMessageType - DESCRIPTOR = _SIGNALREQUEST - - # @@protoc_insertion_point(class_scope:leap.common.events.SignalRequest) - -class RegisterRequest(message.Message): - __metaclass__ = reflection.GeneratedProtocolMessageType - DESCRIPTOR = _REGISTERREQUEST - - # @@protoc_insertion_point(class_scope:leap.common.events.RegisterRequest) - -class UnregisterRequest(message.Message): - __metaclass__ = reflection.GeneratedProtocolMessageType - DESCRIPTOR = _UNREGISTERREQUEST - - # @@protoc_insertion_point(class_scope:leap.common.events.UnregisterRequest) - -class PingRequest(message.Message): - __metaclass__ = reflection.GeneratedProtocolMessageType - DESCRIPTOR = _PINGREQUEST - - # @@protoc_insertion_point(class_scope:leap.common.events.PingRequest) - -class EventResponse(message.Message): - __metaclass__ = reflection.GeneratedProtocolMessageType - DESCRIPTOR = _EVENTRESPONSE - - # @@protoc_insertion_point(class_scope:leap.common.events.EventResponse) - - -_EVENTSSERVERSERVICE = descriptor.ServiceDescriptor( - name='EventsServerService', - full_name='leap.common.events.EventsServerService', - file=DESCRIPTOR, - index=0, - options=None, - serialized_start=1871, - serialized_end=2220, - methods=[ - descriptor.MethodDescriptor( - name='ping', - full_name='leap.common.events.EventsServerService.ping', - index=0, - containing_service=None, - input_type=_PINGREQUEST, - output_type=_EVENTRESPONSE, - options=None, - ), - descriptor.MethodDescriptor( - name='register', - full_name='leap.common.events.EventsServerService.register', - index=1, - containing_service=None, - input_type=_REGISTERREQUEST, - output_type=_EVENTRESPONSE, - options=None, - ), - descriptor.MethodDescriptor( - name='unregister', - full_name='leap.common.events.EventsServerService.unregister', - index=2, - containing_service=None, - input_type=_UNREGISTERREQUEST, - output_type=_EVENTRESPONSE, - options=None, - ), - descriptor.MethodDescriptor( - name='signal', - full_name='leap.common.events.EventsServerService.signal', - index=3, - containing_service=None, - input_type=_SIGNALREQUEST, - output_type=_EVENTRESPONSE, - options=None, - ), -]) - -class EventsServerService(service.Service): - __metaclass__ = service_reflection.GeneratedServiceType - DESCRIPTOR = _EVENTSSERVERSERVICE -class EventsServerService_Stub(EventsServerService): - __metaclass__ = service_reflection.GeneratedServiceStubType - DESCRIPTOR = _EVENTSSERVERSERVICE - - -_EVENTSCLIENTSERVICE = descriptor.ServiceDescriptor( - name='EventsClientService', - full_name='leap.common.events.EventsClientService', - file=DESCRIPTOR, - index=1, - options=None, - serialized_start=2223, - serialized_end=2400, - methods=[ - descriptor.MethodDescriptor( - name='ping', - full_name='leap.common.events.EventsClientService.ping', - index=0, - containing_service=None, - input_type=_PINGREQUEST, - output_type=_EVENTRESPONSE, - options=None, - ), - descriptor.MethodDescriptor( - name='signal', - full_name='leap.common.events.EventsClientService.signal', - index=1, - containing_service=None, - input_type=_SIGNALREQUEST, - output_type=_EVENTRESPONSE, - options=None, - ), -]) - -class EventsClientService(service.Service): - __metaclass__ = service_reflection.GeneratedServiceType - DESCRIPTOR = _EVENTSCLIENTSERVICE -class EventsClientService_Stub(EventsClientService): - __metaclass__ = service_reflection.GeneratedServiceStubType - DESCRIPTOR = _EVENTSCLIENTSERVICE - -# @@protoc_insertion_point(module_scope) diff --git a/src/leap/common/events/server.py b/src/leap/common/events/server.py index 41aede3..a69202e 100644 --- a/src/leap/common/events/server.py +++ b/src/leap/common/events/server.py @@ -14,223 +14,79 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -""" -A server for the events mechanism. - -A server can receive different kinds of requests from clients: - 1. Registration request: store client port number to be notified when - a specific signal arrives. - 2. Signal request: redistribute the signal to registered clients. """ -import logging -import socket +The server for the events mechanism. +""" -from protobuf.socketrpc import RpcService -from leap.common.events import ( - events_pb2 as proto, - daemon, -) +import logging +import txzmq +from leap.common.zmq_utils import zmq_has_curve -logger = logging.getLogger(__name__) +from leap.common.events.zmq_components import TxZmqServerComponent -SERVER_PORT = 8090 +if zmq_has_curve(): + EMIT_ADDR = "tcp://127.0.0.1:9000" + REG_ADDR = "tcp://127.0.0.1:9001" +else: + EMIT_ADDR = "ipc:///tmp/leap.common.events.socket.0" + REG_ADDR = "ipc:///tmp/leap.common.events.socket.1" -# the `registered_clients` dictionary below should have the following -# format: -# -# { event_signal: [ port, ... ], ... } -# -registered_clients = {} - -class PortAlreadyTaken(Exception): - """ - Raised when trying to open a server in a port that is already taken. - """ - pass +logger = logging.getLogger(__name__) -def ensure_server(port=SERVER_PORT): +def ensure_server(emit_addr=EMIT_ADDR, reg_addr=REG_ADDR): """ - Make sure the server is running on the given port. + Make sure the server is running in the given addresses. - Attempt to connect to given local port. Upon success, assume that the - events server has already been started. Upon failure, start events server. + :param emit_addr: The address in which to receive events from clients. + :type emit_addr: str + :param reg_addr: The address to which publish events to clients. + :type reg_addr: str - :param port: the port in which server should be listening - :type port: int - - :return: the daemon instance or nothing - :rtype: EventsServerDaemon or None - - :raise PortAlreadyTaken: Raised if C{port} is already taken by something - that is not an events server. - """ - try: - # check if port is available - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.connect(('localhost', port)) - s.close() - # port is taken, check if there's a server running there - response = ping(port=port, timeout=1000) - if response is not None and response.status == proto.EventResponse.OK: - logger.info('A server is already running on port %d.', port) - return - # port is taken, and not by an events server - logger.warning( - 'Port %d is taken by something not an events server.', port) - raise PortAlreadyTaken(port) - except socket.error: - # port is available, run a server - logger.info('Launching server on port %d.', port) - return EventsServerDaemon.ensure(port) - - -def ping(port=SERVER_PORT, reqcbk=None, timeout=1000): + :return: an events server instance + :rtype: EventsServer """ - Ping the server. - - :param port: the port in which server should be listening - :type port: int - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(proto.PingRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - :return: the response from server for synch calls or nothing for asynch - calls. - :rtype: leap.common.events.events_pb2.EventsResponse or None - """ - request = proto.PingRequest() - service = RpcService( - proto.EventsServerService_Stub, - port, - 'localhost') - logger.debug("Pinging server in port %d..." % port) - return service.ping(request, callback=reqcbk, timeout=timeout) + _server = EventsServer(emit_addr, reg_addr) + return _server -class EventsServerService(proto.EventsServerService): +class EventsServer(TxZmqServerComponent): """ - Service for receiving events in clients. + An events server that listens for events in one address and publishes those + events in another address. """ - def register(self, controller, request, done): - """ - Register a client port to be signaled when specific events come in. - - :param controller: used to mediate a single method call - :type controller: protobuf.socketrpc.controller.SocketRpcController - :param request: the request received from the client - :type request: leap.common.events.events_pb2.RegisterRequest - :param done: callback to be called when done - :type done: protobuf.socketrpc.server.Callback - """ - logger.info("Received registration request: %s..." % str(request)[:40]) - # add client port to signal list - if request.event not in registered_clients: - registered_clients[request.event] = set([]) - registered_clients[request.event].add(request.port) - # send response back to client - - logger.debug('sending response back') - response = proto.EventResponse() - response.status = proto.EventResponse.OK - done.run(response) - - def unregister(self, controller, request, done): - """ - Unregister a client port so it will not be signaled when specific - events come in. - - :param controller: used to mediate a single method call - :type controller: protobuf.socketrpc.controller.SocketRpcController - :param request: the request received from the client - :type request: leap.common.events.events_pb2.RegisterRequest - :param done: callback to be called when done - :type done: protobuf.socketrpc.server.Callback - """ - logger.info( - "Received unregistration request: %s..." % str(request)[:40]) - # remove client port from signal list - response = proto.EventResponse() - if request.event in registered_clients: - try: - registered_clients[request.event].remove(request.port) - response.status = proto.EventResponse.OK - except KeyError: - response.status = proto.EventsResponse.ERROR - response.result = 'Port %d not registered.' % request.port - # send response back to client - logger.debug('sending response back') - done.run(response) - - def signal(self, controller, request, done): - """ - Perform an RPC call to signal all clients registered to receive a - specific signal. - - :param controller: used to mediate a single method call - :type controller: protobuf.socketrpc.controller.SocketRpcController - :param request: the request received from the client - :type request: leap.common.events.events_pb2.SignalRequest - :param done: callback to be called when done - :type done: protobuf.socketrpc.server.Callback - """ - logger.debug('Received signal from client: %s...', str(request)[:40]) - # send signal to all registered clients - # TODO: verify signal auth - if request.event in registered_clients: - for port in registered_clients[request.event]: - - def callback(req, resp): - logger.debug("Signal received by " + str(port)) - - service = RpcService(proto.EventsClientService_Stub, - port, 'localhost') - service.signal(request, callback=callback) - # send response back to client - response = proto.EventResponse() - response.status = proto.EventResponse.OK - done.run(response) - - def ping(self, controller, request, done): + def __init__(self, emit_addr, reg_addr): """ - Reply to a ping request. - - :param controller: used to mediate a single method call - :type controller: protobuf.socketrpc.controller.SocketRpcController - :param request: the request received from the client - :type request: leap.common.events.events_pb2.RegisterRequest - :param done: callback to be called when done - :type done: protobuf.socketrpc.server.Callback - """ - logger.debug("Received ping request, sending response.") - response = proto.EventResponse() - response.status = proto.EventResponse.OK - done.run(response) - + Initialize the events server. -class EventsServerDaemon(daemon.EventsSingletonDaemon): - """ - Singleton class for starting an events server daemon. - """ - - @classmethod - def ensure(cls, port): + :param emit_addr: The address in which to receive events from clients. + :type emit_addr: str + :param reg_addr: The address to which publish events to clients. + :type reg_addr: str """ - Make sure the daemon is running on the given port. - - :param port: the port in which the daemon should listen - :type port: int + TxZmqServerComponent.__init__(self) + # bind PULL and PUB sockets + self._pull, self.pull_port = self._zmq_bind( + txzmq.ZmqPullConnection, emit_addr) + self._pub, self.pub_port = self._zmq_bind( + txzmq.ZmqPubConnection, reg_addr) + # set a handler for arriving messages + self._pull.onPull = self._onPull + + def _onPull(self, message): + """ + Callback executed when a message is pulled from a client. - :return: a daemon instance - :rtype: EventsServerDaemon + :param message: The message sent by the client. + :type message: str """ - return cls.ensure_service(port, EventsServerService()) + event, content = message[0].split(b"\0", 1) + logger.debug("Publishing event: %s" % event) + self._pub.publish(content, tag=event) diff --git a/src/leap/common/events/txclient.py b/src/leap/common/events/txclient.py new file mode 100644 index 0000000..8206ed5 --- /dev/null +++ b/src/leap/common/events/txclient.py @@ -0,0 +1,185 @@ +# -*- coding: utf-8 -*- +# txclient.py +# Copyright (C) 2013, 2014, 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +The client end point of the events mechanism, implemented using txzmq. + +Clients are the communicating parties of the events mechanism. They +communicate by sending messages to a server, which in turn redistributes +messages to other clients. + +When a client registers a callback for a given event, it also tells the +server that it wants to be notified whenever events of that type are sent by +some other client. +""" + + +import logging +import pickle + +import txzmq + +from leap.common.events.zmq_components import TxZmqClientComponent +from leap.common.events.client import EventsClient +from leap.common.events.client import configure_client +from leap.common.events.server import EMIT_ADDR +from leap.common.events.server import REG_ADDR +from leap.common.events import catalog + + +logger = logging.getLogger(__name__) + + +__all__ = [ + "configure_client", + "EventsTxClient", + "register", + "unregister", + "emit", + "shutdown", +] + + +class EventsTxClient(TxZmqClientComponent, EventsClient): + """ + A twisted events client that listens for events in one address and + publishes those events to another address. + """ + + def __init__(self, emit_addr=EMIT_ADDR, reg_addr=REG_ADDR, + path_prefix=None): + """ + Initialize the events server. + """ + TxZmqClientComponent.__init__(self, path_prefix=path_prefix) + EventsClient.__init__(self, emit_addr, reg_addr) + # connect SUB first, otherwise we might miss some event sent from this + # same client + self._sub = self._zmq_connect(txzmq.ZmqSubConnection, reg_addr) + self._sub.gotMessage = self._gotMessage + self._push = self._zmq_connect(txzmq.ZmqPushConnection, emit_addr) + + def _gotMessage(self, msg, tag): + """ + Handle an incoming event. + + :param msg: The incoming message. + :type msg: list(str) + """ + event = getattr(catalog, tag) + content = pickle.loads(msg) + self._handle_event(event, content) + + def _subscribe(self, tag): + """ + Subscribe to a tag on the zmq SUB socket. + + :param tag: The tag to be subscribed. + :type tag: str + """ + self._sub.subscribe(tag) + + def _unsubscribe(self, tag): + """ + Unsubscribe from a tag on the zmq SUB socket. + + :param tag: The tag to be unsubscribed. + :type tag: str + """ + self._sub.unsubscribe(tag) + + def _send(self, data): + """ + Send data through PUSH socket. + + :param data: The data to be sent. + :type event: str + """ + self._push.send(data) + + def shutdown(self): + TxZmqClientComponent.shutdown(self) + EventsClient.shutdown(self) + + +def register(event, callback, uid=None, replace=False): + """ + Register a callback to be executed when an event is received. + + :param event: The event that triggers the callback. + :type event: str + :param callback: The callback to be executed. + :type callback: callable(event, content) + :param uid: The callback uid. + :type uid: str + :param replace: Wether an eventual callback with same ID should be + replaced. + :type replace: bool + + :return: The callback uid. + :rtype: str + + :raises CallbackAlreadyRegisteredError: when there's already a callback + identified by the given uid and replace is False. + """ + return EventsTxClient.instance().register( + event, callback, uid=uid, replace=replace) + + +def unregister(event, uid=None): + """ + Unregister callbacks for an event. + + If uid is not None, then only the callback identified by the given uid is + removed. Otherwise, all callbacks for the event are removed. + + :param event: The event that triggers the callback. + :type event: str + :param uid: The callback uid. + :type uid: str + """ + return EventsTxClient.instance().unregister(event, uid=uid) + + +def emit(event, *content): + """ + Send an event. + + :param event: The event to be sent. + :type event: str + :param content: The content of the event. + :type content: list + """ + return EventsTxClient.instance().emit(event, *content) + + +def shutdown(): + """ + Shutdown the events client. + """ + EventsTxClient.instance().shutdown() + + +def instance(): + """ + Return an instance of the events client. + + :return: An instance of the events client. + :rtype: EventsClientThread + """ + return EventsTxClient.instance() diff --git a/src/leap/common/events/zmq_components.py b/src/leap/common/events/zmq_components.py new file mode 100644 index 0000000..4fb95d3 --- /dev/null +++ b/src/leap/common/events/zmq_components.py @@ -0,0 +1,179 @@ +# -*- coding: utf-8 -*- +# zmq.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +The server for the events mechanism. +""" + + +import os +import logging +import txzmq +import re + +from abc import ABCMeta + +# XXX some distros don't package libsodium, so we have to be prepared for +# absence of zmq.auth +try: + import zmq.auth + from zmq.auth.thread import ThreadAuthenticator +except ImportError: + pass + +from leap.common.config import get_path_prefix +from leap.common.zmq_utils import zmq_has_curve +from leap.common.zmq_utils import maybe_create_and_get_certificates +from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX + + +logger = logging.getLogger(__name__) + + +ADDRESS_RE = re.compile("(.+)://(.+):([0-9]+)") + + +class TxZmqComponent(object): + """ + A twisted-powered zmq events component. + """ + + __metaclass__ = ABCMeta + + _component_type = None + + def __init__(self, path_prefix=None): + """ + Initialize the txzmq component. + """ + self._factory = txzmq.ZmqFactory() + self._factory.registerForShutdown() + if path_prefix == None: + path_prefix = get_path_prefix() + self._config_prefix = os.path.join(path_prefix, "leap", "events") + self._connections = [] + + @property + def component_type(self): + if not self._component_type: + raise Exception( + "Make sure implementations of TxZmqComponent" + "define a self._component_type!") + return self._component_type + + def _zmq_connect(self, connClass, address): + """ + Connect to an address. + + :param connClass: The connection class to be used. + :type connClass: txzmq.ZmqConnection + :param address: The address to connect to. + :type address: str + + :return: The binded connection. + :rtype: txzmq.ZmqConnection + """ + connection = connClass(self._factory) + # create and configure socket + socket = connection.socket + if zmq_has_curve(): + public, secret = maybe_create_and_get_certificates( + self._config_prefix, self.component_type) + server_public_file = os.path.join( + self._config_prefix, PUBLIC_KEYS_PREFIX, "server.key") + server_public, _ = zmq.auth.load_certificate(server_public_file) + socket.curve_publickey = public + socket.curve_secretkey = secret + socket.curve_serverkey = server_public + socket.connect(address) + logger.debug("Connected %s to %s." % (connClass, address)) + self._connections.append(connection) + return connection + + def _zmq_bind(self, connClass, address): + """ + Bind to an address. + + :param connClass: The connection class to be used. + :type connClass: txzmq.ZmqConnection + :param address: The address to bind to. + :type address: str + + :return: The binded connection and port. + :rtype: (txzmq.ZmqConnection, int) + """ + connection = connClass(self._factory) + socket = connection.socket + if zmq_has_curve(): + public, secret = maybe_create_and_get_certificates( + self._config_prefix, self.component_type) + socket.curve_publickey = public + socket.curve_secretkey = secret + self._start_thread_auth(connection.socket) + # check if port was given + protocol, addr, port = ADDRESS_RE.match(address).groups() + if port == "0": + port = socket.bind_to_random_port("%s://%s" % (protocol, addr)) + else: + socket.bind(address) + port = int(port) + logger.debug("Binded %s to %s://%s:%d." + % (connClass, protocol, addr, port)) + self._connections.append(connection) + return connection, port + + def _start_thread_auth(self, socket): + """ + Start the zmq curve thread authenticator. + + :param socket: The socket in which to configure the authenticator. + :type socket: zmq.Socket + """ + authenticator = ThreadAuthenticator(self._factory.context) + authenticator.start() + # XXX do not hardcode this here. + authenticator.allow('127.0.0.1') + # tell authenticator to use the certificate in a directory + public_keys_dir = os.path.join(self._config_prefix, PUBLIC_KEYS_PREFIX) + authenticator.configure_curve(domain="*", location=public_keys_dir) + socket.curve_server = True # must come before bind + + def shutdown(self): + """ + Shutdown the component. + """ + logger.debug("Shutting down component %s." % str(self)) + for conn in self._connections: + conn.shutdown() + self._factory.shutdown() + + +class TxZmqServerComponent(TxZmqComponent): + """ + A txZMQ server component. + """ + + _component_type = "server" + + +class TxZmqClientComponent(TxZmqComponent): + """ + A txZMQ client component. + """ + + _component_type = "client" diff --git a/src/leap/common/tests/test_events.py b/src/leap/common/tests/test_events.py index 0779b2e..7ef3e1b 100644 --- a/src/leap/common/tests/test_events.py +++ b/src/leap/common/tests/test_events.py @@ -15,414 +15,156 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import unittest -import sets -import time -import socket -import threading -import random - - -from mock import Mock -from protobuf.socketrpc import RpcService -from leap.common import events -from leap.common.events import ( - server, - client, - mac_auth, -) -from leap.common.events.events_pb2 import ( - EventsServerService, - EventsServerService_Stub, - EventsClientService_Stub, - EventResponse, - SignalRequest, - RegisterRequest, - PingRequest, - SOLEDAD_CREATING_KEYS, - CLIENT_UID, -) +import os +import logging +import time -port = 8090 +from twisted.trial import unittest +from twisted.internet import defer -received = False +from leap.common.events import server +from leap.common.events import client +from leap.common.events import txclient +from leap.common.events import catalog +from leap.common.events.errors import CallbackAlreadyRegisteredError -class EventsTestCase(unittest.TestCase): +if 'DEBUG' in os.environ: + logging.basicConfig(level=logging.DEBUG) - @classmethod - def setUpClass(cls): - server.EventsServerDaemon.ensure(8090) - cls.callbacks = events.client.registered_callbacks - @classmethod - def tearDownClass(cls): - # give some time for requests to be processed. - time.sleep(1) +class EventsGenericClientTestCase(object): def setUp(self): - super(EventsTestCase, self).setUp() + self._server = server.ensure_server( + emit_addr="tcp://127.0.0.1:0", + reg_addr="tcp://127.0.0.1:0") + self._client.configure_client( + emit_addr="tcp://127.0.0.1:%d" % self._server.pull_port, + reg_addr="tcp://127.0.0.1:%d" % self._server.pub_port) def tearDown(self): - #events.client.registered_callbacks = {} - server.registered_clients = {} - super(EventsTestCase, self).tearDown() - - def test_service_singleton(self): - """ - Ensure that there's always just one instance of the server daemon - running. - """ - service1 = server.EventsServerDaemon.ensure(8090) - service2 = server.EventsServerDaemon.ensure(8090) - self.assertEqual(service1, service2, - "Can't get singleton class for service.") + self._client.shutdown() + self._server.shutdown() + # wait a bit for sockets to close properly + time.sleep(0.1) def test_client_register(self): """ Ensure clients can register callbacks. """ - self.assertTrue(1 not in self.callbacks, - 'There should should be no callback for this signal.') - events.register(1, lambda x: True) - self.assertTrue(1 in self.callbacks, - 'Could not register signal in local client.') - events.register(2, lambda x: True) - self.assertTrue(1 in self.callbacks, - 'Could not register signal in local client.') - self.assertTrue(2 in self.callbacks, - 'Could not register signal in local client.') + callbacks = self._client.instance().callbacks + self.assertTrue(len(callbacks) == 0, + 'There should be no callback for this event.') + # register one event + event1 = catalog.CLIENT_UID + cbk1 = lambda event, _: True + uid1 = self._client.register(event1, cbk1) + # assert for correct registration + self.assertTrue(len(callbacks) == 1) + self.assertTrue(callbacks[event1][uid1] == cbk1, + 'Could not register event in local client.') + # register another event + event2 = catalog.CLIENT_SESSION_ID + cbk2 = lambda event, _: True + uid2 = self._client.register(event2, cbk2) + # assert for correct registration + self.assertTrue(len(callbacks) == 2) + self.assertTrue(callbacks[event2][uid2] == cbk2, + 'Could not register event in local client.') def test_register_signal_replace(self): """ Make sure clients can replace already registered callbacks. """ - sig = 3 - cbk = lambda x: True - events.register(sig, cbk, uid=1) - self.assertRaises(Exception, events.register, sig, lambda x: True, - uid=1) - events.register(sig, lambda x: True, uid=1, replace=True) - self.assertTrue(sig in self.callbacks, 'Could not register signal.') - self.assertEqual(1, len(self.callbacks[sig]), - 'Wrong number of registered callbacks.') - - def test_signal_response_status(self): - """ - Ensure there's an appropriate response from server when signaling. - """ - sig = 4 - request = SignalRequest() - request.event = sig - request.content = 'my signal contents' - request.mac_method = mac_auth.MacMethod.MAC_NONE - request.mac = "" - service = RpcService(EventsServerService_Stub, port, 'localhost') - # test synch - response = service.signal(request, timeout=1000) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') + event = catalog.CLIENT_UID + d = defer.Deferred() + cbk_fail = lambda event, _: d.errback(event) + cbk_succeed = lambda event, _: d.callback(event) + self._client.register(event, cbk_fail, uid=1) + self._client.register(event, cbk_succeed, uid=1, replace=True) + self._client.emit(event, None) + return d - def test_signal_executes_callback(self): - """ - Ensure callback is executed upon receiving signal. + def test_register_signal_replace_fails_when_replace_is_false(self): """ - sig = CLIENT_UID - request = SignalRequest() - request.event = sig - request.content = 'my signal contents' - request.mac_method = mac_auth.MacMethod.MAC_NONE - request.mac = "" - service = RpcService(EventsServerService_Stub, port, 'localhost') - - # register a callback - flag = Mock() - events.register(sig, lambda req: flag(req.event)) - # signal - response = service.signal(request) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') - time.sleep(1) # wait for signal to arrive - flag.assert_called_once_with(sig) - - def test_events_server_service_register(self): + Make sure clients trying to replace already registered callbacks fail + when replace=False """ - Ensure the server can register clients to be signaled. - """ - sig = 5 - request = RegisterRequest() - request.event = sig - request.port = 8091 - request.mac_method = mac_auth.MacMethod.MAC_NONE - request.mac = "" - service = RpcService(EventsServerService_Stub, port, 'localhost') - complist = server.registered_clients - self.assertEqual({}, complist, - 'There should be no registered_ports when ' - 'server has just been created.') - response = service.register(request, timeout=1000) - self.assertTrue(sig in complist, "Signal not registered succesfully.") - self.assertTrue(8091 in complist[sig], - 'Failed registering client port.') + event = catalog.CLIENT_UID + self._client.register(event, lambda event, _: None, uid=1) + self.assertRaises( + CallbackAlreadyRegisteredError, + self._client.register, + event, lambda event, _: None, uid=1, replace=False) - def test_client_request_register(self): + def test_register_more_than_one_callback_works(self): """ - Ensure clients can register themselves with server. + Make sure clients can replace already registered callbacks. """ - sig = 6 - complist = server.registered_clients - self.assertTrue(sig not in complist, - 'There should be no registered clients for this ' - 'signal.') - events.register(sig, lambda x: True) - time.sleep(0.1) - port = client.EventsClientDaemon.get_instance().get_port() - self.assertTrue(sig in complist, 'Failed registering client.') - self.assertTrue(port in complist[sig], - 'Failed registering client port.') + event = catalog.CLIENT_UID + d1 = defer.Deferred() + cbk1 = lambda event, _: d1.callback(event) + d2 = defer.Deferred() + cbk2 = lambda event, _: d2.callback(event) + self._client.register(event, cbk1) + self._client.register(event, cbk2) + self._client.emit(event, None) + d = defer.gatherResults([d1, d2]) + return d def test_client_receives_signal(self): """ Ensure clients can receive signals. """ - sig = 7 - flag = Mock() - - events.register(sig, lambda req: flag(req.event)) - request = SignalRequest() - request.event = sig - request.content = "" - request.mac_method = mac_auth.MacMethod.MAC_NONE - request.mac = "" - service = RpcService(EventsServerService_Stub, port, 'localhost') - response = service.signal(request, timeout=1000) - self.assertTrue(response is not None, 'Did not receive response.') - time.sleep(0.5) - flag.assert_called_once_with(sig) - - def test_client_send_signal(self): - """ - Ensure clients can send signals. - """ - sig = 8 - response = events.signal(sig) - self.assertTrue(response.status == response.OK, - 'Received wrong response status when signaling.') + event = catalog.CLIENT_UID + d = defer.Deferred() + def cbk(events, _): + d.callback(event) + self._client.register(event, cbk) + self._client.emit(event, None) + return d def test_client_unregister_all(self): """ Test that the client can unregister all events for one signal. """ - sig = CLIENT_UID - complist = server.registered_clients - events.register(sig, lambda x: True) - events.register(sig, lambda x: True) - time.sleep(0.1) - events.unregister(sig) - time.sleep(0.1) - port = client.EventsClientDaemon.get_instance().get_port() - self.assertFalse(bool(complist[sig])) - self.assertTrue(port not in complist[sig]) + event1 = catalog.CLIENT_UID + d = defer.Deferred() + # register more than one callback for the same event + self._client.register(event1, lambda ev, _: d.errback(None)) + self._client.register(event1, lambda ev, _: d.errback(None)) + # unregister and emit the event + self._client.unregister(event1) + self._client.emit(event1, None) + # register and emit another event so the deferred can succeed + event2 = catalog.CLIENT_SESSION_ID + self._client.register(event2, lambda ev, _: d.callback(None)) + self._client.emit(event2, None) + return d def test_client_unregister_by_uid(self): """ Test that the client can unregister an event by uid. """ - sig = CLIENT_UID - complist = server.registered_clients - events.register(sig, lambda x: True, uid='cbkuid') - events.register(sig, lambda x: True, uid='cbkuid2') - time.sleep(0.1) - events.unregister(sig, uid='cbkuid') - time.sleep(0.1) - port = client.EventsClientDaemon.get_instance().get_port() - self.assertTrue(sig in complist) - self.assertTrue(len(complist[sig]) == 1) - self.assertTrue( - client.registered_callbacks[sig].pop()[0] == 'cbkuid2') - self.assertTrue(port in complist[sig]) - - def test_server_replies_ping(self): - """ - Ensure server replies to a ping. - """ - request = PingRequest() - service = RpcService(EventsServerService_Stub, port, 'localhost') - response = service.ping(request, timeout=1000) - self.assertIsNotNone(response) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') - - def test_client_replies_ping(self): - """ - Ensure clients reply to a ping. - """ - daemon = client.ensure_client_daemon() - port = daemon.get_port() - request = PingRequest() - service = RpcService(EventsClientService_Stub, port, 'localhost') - response = service.ping(request, timeout=1000) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') - - def test_server_ping(self): - """ - Ensure the function from server module pings correctly. - """ - response = server.ping() - self.assertIsNotNone(response) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') - - def test_client_ping(self): - """ - Ensure the function from client module pings correctly. - """ - daemon = client.ensure_client_daemon() - response = client.ping(daemon.get_port()) - self.assertIsNotNone(response) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') - - def test_module_ping_server(self): - """ - Ensure the function from main module pings server correctly. - """ - response = events.ping_server() - self.assertIsNotNone(response) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') - - def test_module_ping_client(self): - """ - Ensure the function from main module pings clients correctly. - """ - daemon = client.ensure_client_daemon() - response = events.ping_client(daemon.get_port()) - self.assertIsNotNone(response) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') - - def test_ensure_server_raises_if_port_taken(self): - """ - Verify that server raises an exception if port is already taken. - """ - # get a random free port - while True: - port = random.randint(1024, 65535) - try: - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.connect(('localhost', port)) - s.close() - except: - break - - class PortBlocker(threading.Thread): - - def run(self): - conns = 0 - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.bind(('localhost', port)) - s.setblocking(1) - s.listen(1) - while conns < 2: # blocks until rece - conns += 1 - s.accept() - s.close() - - # block the port - taker = PortBlocker() - taker.start() - time.sleep(1) # wait for thread to start. - self.assertRaises( - server.PortAlreadyTaken, server.ensure_server, port) - - def test_async_register(self): - """ - Test asynchronous registering of callbacks. - """ - flag = Mock() + event = catalog.CLIENT_UID + d = defer.Deferred() + # register one callback that would fail + uid = self._client.register(event, lambda ev, _: d.errback(None)) + # register one callback that will succeed + self._client.register(event, lambda ev, _: d.callback(None)) + # unregister by uid and emit the event + self._client.unregister(event, uid=uid) + self._client.emit(event, None) + return d - # executed after async register, when response is received from server - def reqcbk(request, response): - flag(request.event, response.status) - # callback registered by application - def callback(request): - pass +class EventsTxClientTestCase(EventsGenericClientTestCase, unittest.TestCase): - # passing a callback as reqcbk param makes the call asynchronous - result = events.register(CLIENT_UID, callback, reqcbk=reqcbk) - self.assertIsNone(result) - events.signal(CLIENT_UID) - time.sleep(1) # wait for signal to arrive from server - flag.assert_called_once_with(CLIENT_UID, EventResponse.OK) + _client = txclient - def test_async_signal(self): - """ - Test asynchronous registering of callbacks. - """ - flag = Mock() - - # executed after async signal, when response is received from server - def reqcbk(request, response): - flag(request.event, response.status) - - # passing a callback as reqcbk param makes the call asynchronous - result = events.signal(CLIENT_UID, reqcbk=reqcbk) - self.assertIsNone(result) - time.sleep(1) # wait for signal to arrive from server - flag.assert_called_once_with(CLIENT_UID, EventResponse.OK) - - def test_async_unregister(self): - """ - Test asynchronous unregistering of callbacks. - """ - flag = Mock() - - # executed after async signal, when response is received from server - def reqcbk(request, response): - flag(request.event, response.status) - - # callback registered by application - def callback(request): - pass - - # passing a callback as reqcbk param makes the call asynchronous - events.register(CLIENT_UID, callback) - result = events.unregister(CLIENT_UID, reqcbk=reqcbk) - self.assertIsNone(result) - time.sleep(1) # wait for signal to arrive from server - flag.assert_called_once_with(CLIENT_UID, EventResponse.OK) - - def test_async_ping_server(self): - """ - Test asynchronous pinging of server. - """ - flag = Mock() - - # executed after async signal, when response is received from server - def reqcbk(request, response): - flag(response.status) - - result = events.ping_server(reqcbk=reqcbk) - self.assertIsNone(result) - time.sleep(1) # wait for response to arrive from server. - flag.assert_called_once_with(EventResponse.OK) - - def test_async_ping_client(self): - """ - Test asynchronous pinging of client. - """ - flag = Mock() - # executed after async signal, when response is received from server - def reqcbk(request, response): - flag(response.status) +class EventsClientTestCase(EventsGenericClientTestCase, unittest.TestCase): - daemon = client.ensure_client_daemon() - result = events.ping_client(daemon.get_port(), reqcbk=reqcbk) - self.assertIsNone(result) - time.sleep(1) # wait for response to arrive from server. - flag.assert_called_once_with(EventResponse.OK) + _client = client diff --git a/src/leap/common/zmq_utils.py b/src/leap/common/zmq_utils.py new file mode 100644 index 0000000..19625b9 --- /dev/null +++ b/src/leap/common/zmq_utils.py @@ -0,0 +1,105 @@ +# -*- coding: utf-8 -*- +# zmq.py +# Copyright (C) 2013, 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Utilities to handle ZMQ certificates. +""" +import os +import logging +import stat +import shutil + +import zmq + +try: + import zmq.auth +except ImportError: + pass + +from leap.common.files import mkdir_p +from leap.common.check import leap_assert + +logger = logging.getLogger(__name__) + + +KEYS_PREFIX = "zmq_certificates" +PUBLIC_KEYS_PREFIX = os.path.join(KEYS_PREFIX, "public_keys") +PRIVATE_KEYS_PREFIX = os.path.join(KEYS_PREFIX, "private_keys") + + +def zmq_has_curve(): + """ + Return whether the current ZMQ has support for auth and CurveZMQ security. + + :rtype: bool + + Version notes: + `zmq.curve_keypair()` is new in version 14.0, new in version libzmq-4.0. + Requires libzmq (>= 4.0) to have been linked with libsodium. + `zmq.auth` module is new in version 14.1 + `zmq.has()` is new in version 14.1, new in version libzmq-4.1. + """ + zmq_version = zmq.zmq_version_info() + pyzmq_version = zmq.pyzmq_version_info() + + if pyzmq_version >= (14, 1, 0) and zmq_version >= (4, 1): + return zmq.has('curve') + + if pyzmq_version < (14, 1, 0): + return False + + if zmq_version < (4, 0): + # security is new in libzmq 4.0 + return False + + try: + zmq.curve_keypair() + except zmq.error.ZMQError: + # security requires libzmq to be linked against libsodium + return False + + return True + + +def assert_zmq_has_curve(): + leap_assert(zmq_has_curve, "CurveZMQ not supported!") + + +def maybe_create_and_get_certificates(basedir, name): + """ + Generate the needed ZMQ certificates for backend/frontend communication if + needed. + """ + assert_zmq_has_curve() + private_keys_dir = os.path.join(basedir, PRIVATE_KEYS_PREFIX) + private_key = os.path.join( + private_keys_dir, name + ".key_secret") + if not os.path.isfile(private_key): + mkdir_p(private_keys_dir) + zmq.auth.create_certificates(private_keys_dir, name) + # set permissions to: 0700 (U:rwx G:--- O:---) + os.chmod(private_key, stat.S_IRUSR | stat.S_IWUSR) + # move public key to public keys directory + public_keys_dir = os.path.join(basedir, PUBLIC_KEYS_PREFIX) + old_public_key = os.path.join( + private_keys_dir, name + ".key") + new_public_key = os.path.join( + public_keys_dir, name + ".key") + mkdir_p(public_keys_dir) + shutil.move(old_public_key, new_public_key) + return zmq.auth.load_certificate(private_key) + + |