summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2015-02-04 15:04:10 -0200
committerdrebs <drebs@leap.se>2015-05-27 14:37:27 -0300
commit514c1434a016b09d93e8dfc5578b14825d14005a (patch)
treec4bacce1df24a81b2de3d1343dac26eb56e30ac7 /src
parent71c750ef9c3e53ef416d1de6e85458f16ca48d74 (diff)
[feat] refactor events to use ZMQ
Before this commit, protobuf and protobuf.socketrpc were used to serialize and transmit messages between events clients. This change implements a simpler ZMQ client/server events mechanism that uses ZMQ sockets for transmitting messages from clients to server and to redistribute such messages to subscribed clients. Closes: #6359
Diffstat (limited to 'src')
-rw-r--r--src/leap/common/events/Makefile31
-rw-r--r--src/leap/common/events/README.rst117
-rw-r--r--src/leap/common/events/__init__.py315
-rw-r--r--src/leap/common/events/catalog.py88
-rw-r--r--src/leap/common/events/client.py712
-rw-r--r--src/leap/common/events/daemon.py208
-rw-r--r--src/leap/common/events/errors.py (renamed from src/leap/common/events/mac_auth.py)18
-rw-r--r--src/leap/common/events/events.proto145
-rw-r--r--src/leap/common/events/events_pb2.py635
-rw-r--r--src/leap/common/events/server.py242
-rw-r--r--src/leap/common/events/txclient.py185
-rw-r--r--src/leap/common/events/zmq_components.py179
-rw-r--r--src/leap/common/tests/test_events.py468
-rw-r--r--src/leap/common/zmq_utils.py105
14 files changed, 1420 insertions, 2028 deletions
diff --git a/src/leap/common/events/Makefile b/src/leap/common/events/Makefile
deleted file mode 100644
index 5b7e60d..0000000
--- a/src/leap/common/events/Makefile
+++ /dev/null
@@ -1,31 +0,0 @@
-# -*- coding: utf-8 -*-
-# Makefile
-# Copyright (C) 2013 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-# This file is used to generate protobuf python files that are used for IPC:
-#
-# https://developers.google.com/protocol-buffers/docs/pythontutorial
-
-PROTOC = protoc
-
-all: events_pb2.py
-
-%_pb2.py: %.proto
- $(PROTOC) --python_out=./ $<
-# autopep8 --in-place --aggressive $@
-
-clean:
- rm -f *_pb2.py
diff --git a/src/leap/common/events/README.rst b/src/leap/common/events/README.rst
index 2e7f254..f455cc8 100644
--- a/src/leap/common/events/README.rst
+++ b/src/leap/common/events/README.rst
@@ -1,19 +1,83 @@
Events mechanism
================
-The events mechanism allows for clients to send signal events to each
-other by means of a centralized server. Clients can register with the
-server to receive signals of certain types, and they can also send signals to
-the server that will then redistribute these signals to registered clients.
+The events mechanism allows for clients to send events to each other by means
+of a centralized server. Clients can register with the server to receive
+events of certain types, and they can also send events to the server that will
+then redistribute these events to registered clients.
-Listening daemons
------------------
+ZMQ connections and events redistribution
+-----------------------------------------
-Both clients and the server listen for incoming messages by using a
-listening daemon that runs in its own thread. The server daemon has to be
-started explicitly, while clients daemon will be started whenever a
-client registers with the server to receive messages.
+Clients and server use ZMQ connection patterns to communicate. Clients can
+push events to the server, and may subscribe to events published by the
+server. The server in turn pulls events from clients and publishes them to
+subscribed clients.
+
+Clients connect to the server's zmq pub socket, and register to specific
+events indicating which callbacks should be executed when that event is
+received:
+
+
+ EventsServer
+ .------------.
+ |PULL PUB|
+ '------------'
+ ^^
+ ||
+ reg(1, cbk1) |'--------------. reg(2, cbk2)
+ | |
+ | |
+ .------------. .------------. .------------.
+ |PUSH SUB| |PUSH SUB| |PUSH SUB|
+ '------------' '------------' '------------'
+ EventsClient EventsClient EventsClient
+
+
+A client that wants to send an event connects to the server's zmq pull socket
+and pushes the event to the server. The server then redistributes it to all
+clients subscribed to that event.
+
+
+ EventsServer
+ .------------.
+ |PULL---->PUB|
+ '------------'
+ ^ |.
+ | |.
+sig(1, 'foo') .----------------' |'...............
+ | | .
+ | v .
+ .------------. .------------. .------------.
+ |PUSH SUB| |PUSH SUB| |PUSH SUB|
+ '------------' '------------' '------------'
+ EventsClient EventsClient EventsClient
+ |
+ v
+ cbk1(1, 'foo')
+
+
+Any client may emit or subscribe to an event. ZMQ will manage sockets and
+reuse the connection whenever it can.
+
+
+ EventsServer
+ .------------.
+ |PULL---->PUB|
+ '------------'
+ ^ .|
+ | .|
+sig(2, 'bar') .-----------------' .'--------------.
+ | . |
+ | . v
+ .------------. .------------. .------------.
+ |PUSH SUB| |PUSH SUB| |PUSH SUB|
+ '------------' '------------' '------------'
+ EventsClient EventsClient EventsClient
+ |
+ v
+ cbk2(2, 'bar')
How to use it
@@ -22,32 +86,27 @@ How to use it
To start the events server:
>>> from leap.common.events import server
->>> server.ensure_server(port=8090)
+>>> server.ensure_server(
+ emit_addr="tcp://127.0.0.1:9000",
+ reg_addr="tcp://127.0.0.1:9001")
-To register a callback to be called when a given signal is raised:
+To register a callback to be called when a given event is raised:
->>> from leap.common.events import (
->>> register,
->>> events_pb2 as proto,
->>> )
+>>> from leap.common.events import register
+>>> from leap.common.events import catalog
>>>
->>> def mycallback(sigreq):
->>> print str(sigreq)
+>>> def mycbk(event, *content):
+>>> print "%s, %s" (str(event), str(content))
>>>
->>> events.register(signal=proto.CLIENT_UID, callback=mycallback)
+>>> register(catalog.CLIENT_UID, callback=mycbk)
-To signal an event:
+To emit an event:
->>> from leap.common.events import (
->>> signal,
->>> events_pb2 as proto,
->>> )
->>> signal(proto.CLIENT_UID)
+>>> from leap.common.events import emit
+>>> from leap.common.events import catalog
+>>> emit(catalog.CLIENT_UID)
Adding events
-------------
-* Add the new event under enum ``Event`` in ``events.proto``
-* Compile the new protocolbuffers file::
-
- make
+To add a new event, just add it to ``catalog.py``.
diff --git a/src/leap/common/events/__init__.py b/src/leap/common/events/__init__.py
index 0cc6573..9269b9a 100644
--- a/src/leap/common/events/__init__.py
+++ b/src/leap/common/events/__init__.py
@@ -15,188 +15,199 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
"""
-This is an events mechanism that uses a server to allow for signaling of
-events between clients.
+This is an events mechanism that uses a server to allow for emitting events
+between clients.
Application components should use the interface available in this file to
-register callbacks to be executed upon receiving specific signals, and to send
-signals to other components.
+register callbacks to be executed upon receiving specific events, and to send
+events to other components.
-To register a callback to be executed when a specific event is signaled, use
+To register a callback to be executed when a specific event is emitted, use
leap.common.events.register():
>>> from leap.common.events import register
->>> from leap.common.events import events_pb2 as proto
->>> register(proto.CLIENT_UID, lambda req: do_something(req))
-
-To signal an event, use leap.common.events.signal():
-
->>> from leap.common.events import signal
->>> from leap.common.events import events_pb2 as proto
->>> signal(proto.CLIENT_UID)
-
-
-NOTE ABOUT SYNC/ASYNC REQUESTS:
-
-Clients always communicate with the server, and never between themselves.
-When a client registers a callback for an event, the callback is saved locally
-in the client and the server stores the client socket port in a list
-associated with that event. When a client signals an event, the server
-forwards the signal to all registered client ports, and then each client
-executes its callbacks associated with that event locally.
-
-Each RPC call from a client to the server is followed by a response from the
-server to the client. Calls to register() and signal() (and all other RPC
-calls) can be synchronous or asynchronous meaning if they will or not wait for
-the server's response before returning.
-
-This mechanism was built on top of protobuf.socketrpc, and because of this RPC
-calls are made synchronous or asynchronous in the following way:
-
- * If RPC calls receive a parameter called `reqcbk`, then the call is made
- asynchronous. That means that:
+>>> from leap.common.events import catalog
+>>> register(catalog.CLIENT_UID, lambda sig, content: do_something(content))
- - an eventual `timeout` parameter is not used,
- - the call returns immediatelly with value None, and
- - the `reqcbk` callback is executed asynchronously upon the arrival of
- a response from the server.
+To emit an event, use leap.common.events.emit():
- * Otherwise, if the `reqcbk` parameter is None, then the call is made in a
- synchronous manner:
-
- - if a response from server arrives within `timeout` milliseconds, the
- RPC call returns it;
- - otherwise, the call returns None.
+>>> from leap.common.events import emit
+>>> from leap.common.events import catalog
+>>> emit(catalog.CLIENT_UID)
"""
+
import logging
-import socket
+import argparse
+
+from leap.common.events import client
+from leap.common.events import server
+from leap.common.events import catalog
-from leap.common.events import (
- events_pb2 as proto,
- server,
- client,
- daemon,
-)
+__all__ = [
+ "register",
+ "unregister",
+ "emit",
+ "catalog",
+]
logger = logging.getLogger(__name__)
-def register(signal, callback, uid=None, replace=False, reqcbk=None,
- timeout=1000):
+def register(event, callback, uid=None, replace=False):
"""
- Register a callback to be called when the given signal is received.
-
- Will timeout after timeout ms if response has not been received. The
- timeout arg is only used for asynch requests. If a reqcbk callback has
- been supplied the timeout arg is not used. The response value will be
- returned for a synch request but nothing will be returned for an asynch
- request.
-
- :param signal: the signal that causes the callback to be launched
- :type signal: int (see the `events.proto` file)
- :param callback: the callback to be called when the signal is received
- :type callback: function
- :param uid: a unique id for the callback
- :type uid: int
- :param replace: should an existent callback with same uid be replaced?
+ Register a callback to be executed when an event is received.
+
+ :param event: The event that triggers the callback.
+ :type event: str
+ :param callback: The callback to be executed.
+ :type callback: callable(event, content)
+ :param uid: The callback uid.
+ :type uid: str
+ :param replace: Wether an eventual callback with same ID should be
+ replaced.
:type replace: bool
- :param reqcbk: a callback to be called when a response from server is
- received
- :type reqcbk: function(leap.common.events.events_pb2.EventResponse)
- :param timeout: the timeout for synch calls
- :type timeout: int
-
- :return: the response from server for synch calls or nothing for asynch
- calls.
- :rtype: leap.common.events.events_pb2.EventsResponse or None
- """
- return client.register(signal, callback, uid, replace, reqcbk, timeout)
+ :return: The callback uid.
+ :rtype: str
-def unregister(signal, uid=None, reqcbk=None, timeout=1000):
- """
- Unregister a callback.
-
- If C{uid} is specified, unregisters only the callback identified by that
- unique id. Otherwise, unregisters all callbacks registered for C{signal}.
-
- :param signal: the signal that causes the callback to be launched
- :type signal: int (see the `events.proto` file)
- :param uid: a unique id for the callback
- :type uid: int
- :param reqcbk: a callback to be called when a response from server is
- received
- :type reqcbk: function(proto.UnregisterRequest, proto.EventResponse)
- :param timeout: the timeout for synch calls
- :type timeout: int
-
- :return: the response from server for synch calls or nothing for asynch
- calls.
- :rtype: leap.common.events.events_pb2.EventsResponse or None
+ :raises CallbackAlreadyRegistered: when there's already a callback
+ identified by the given uid and replace is False.
"""
- return client.unregister(signal, uid, reqcbk, timeout)
+ return client.register(event, callback, uid, replace)
-def signal(signal, content="", mac_method="", mac="", reqcbk=None,
- timeout=1000):
+def unregister(event, uid=None):
"""
- Send `signal` event to events server.
-
- Will timeout after timeout ms if response has not been received. The
- timeout arg is only used for asynch requests. If a reqcbk callback has
- been supplied the timeout arg is not used. The response value will be
- returned for a synch request but nothing will be returned for an asynch
- request.
-
- :param signal: the signal that causes the callback to be launched
- :type signal: int (see the `events.proto` file)
- :param content: the contents of the event signal
- :type content: str
- :param mac_method: the method used to auth mac
- :type mac_method: str
- :param mac: the content of the auth mac
- :type mac: str
- :param reqcbk: a callback to be called when a response from server is
- received
- :type reqcbk: function(proto.SignalRequest, proto.EventResponse)
- :param timeout: the timeout for synch calls
- :type timeout: int
-
- :return: the response from server for synch calls or nothing for asynch
- calls.
- :rtype: leap.common.events.events_pb2.EventsResponse or None
- """
- return client.signal(signal, content, mac_method, mac, reqcbk, timeout)
+ Unregister callbacks for an event.
-def ping_client(port, reqcbk=None, timeout=1000):
- """
- Ping a client running in C{port}.
-
- :param port: the port in which the client should be listening
- :type port: int
- :param reqcbk: a callback to be called when a response from client is
- received
- :type reqcbk: function(proto.PingRequest, proto.EventResponse)
- :param timeout: the timeout for synch calls
- :type timeout: int
+ If uid is not None, then only the callback identified by the given uid is
+ removed. Otherwise, all callbacks for the event are removed.
+
+ :param event: The event that triggers the callback.
+ :type event: Event
+ :param uid: The callback uid.
+ :type uid: str
"""
- return client.ping(port, reqcbk=reqcbk, timeout=timeout)
+ return client.unregister(event, uid)
-def ping_server(port=server.SERVER_PORT, reqcbk=None, timeout=1000):
+def emit(event, *content):
"""
- Ping the server.
-
- :param port: the port in which server should be listening
- :type port: int
- :param reqcbk: a callback to be called when a response from server is
- received
- :type reqcbk: function(proto.PingRequest, proto.EventResponse)
- :param timeout: the timeout for synch calls
- :type timeout: int
+ Send an event.
+
+ :param event: The event to be sent.
+ :type event: Event
+ :param content: The content of the event.
+ :type content: list
"""
- return server.ping(port, reqcbk=reqcbk, timeout=timeout)
+ return client.emit(event, *content)
+
+
+if __name__ == "__main__":
+
+ def _echo(event, *content):
+ print "Received event: (%s, %s)" % (event, content)
+
+ def _parse_args():
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "--debug", "-d", action="store_true",
+ help="print debug information")
+
+ subparsers = parser.add_subparsers(dest="command")
+
+ # server options
+ server_parser = subparsers.add_parser(
+ "server", help="Run an events server.")
+ server_parser.add_argument(
+ "--emit-addr",
+ help="The address in which to listen for events",
+ default=server.EMIT_ADDR)
+ server_parser.add_argument(
+ "--reg-addr",
+ help="The address in which to listen for registration for events.",
+ default=server.REG_ADDR)
+
+ # client options
+ client_parser = subparsers.add_parser(
+ "client", help="Run an events client.")
+ client_parser.add_argument(
+ "--emit-addr",
+ help="The address in which to emit events.",
+ default=server.EMIT_ADDR)
+ client_parser.add_argument(
+ "--reg-addr",
+ help="The address in which to register for events.",
+ default=server.REG_ADDR)
+ group = client_parser.add_mutually_exclusive_group(required=True)
+ group.add_argument('--reg', help="register an event")
+ group.add_argument('--emit', help="send an event")
+ client_parser.add_argument(
+ '--content', help="the content of the event", default=None)
+
+ # txclient options
+ txclient_parser = subparsers.add_parser(
+ "txclient", help="Run an events twisted client.")
+ txclient_parser.add_argument(
+ "--emit-addr",
+ help="The address in which to emit events.",
+ default=server.EMIT_ADDR)
+ txclient_parser.add_argument(
+ "--reg-addr",
+ help="The address in which to register for events.",
+ default=server.REG_ADDR)
+ group = txclient_parser.add_mutually_exclusive_group(required=True)
+ group.add_argument('--reg', help="register an event")
+ group.add_argument('--emit', help="send an event")
+ txclient_parser.add_argument(
+ '--content', help="the content of the event", default=None)
+
+ return parser.parse_args()
+
+ args = _parse_args()
+
+ if args.debug:
+ logging.basicConfig(level=logging.DEBUG)
+
+ if args.command == "server":
+ # run server
+ server.ensure_server(emit_addr=args.emit_addr, reg_addr=args.reg_addr)
+ from twisted.internet import reactor
+ reactor.run()
+ elif args.command == "client":
+ if args.reg:
+ event = getattr(catalog, args.reg)
+ # run client and register to a signal
+ register(event, _echo)
+ # make sure we stop on CTRL+C
+ import signal
+ signal.signal(
+ signal.SIGINT, lambda sig, frame: client.shutdown())
+ # wait until client thread dies
+ import time
+ while client.EventsClientThread.instance().is_alive():
+ time.sleep(0.1)
+ if args.emit:
+ # run client and emit a signal
+ event = getattr(catalog, args.emit)
+ emit(event, args.content)
+ client.shutdown()
+ elif args.command == "txclient":
+ from leap.common.events import txclient
+ register = txclient.register
+ emit = txclient.emit
+ if args.reg:
+ event = getattr(catalog, args.reg)
+ # run client and register to a signal
+ register(event, _echo)
+ from twisted.internet import reactor
+ reactor.run()
+ if args.emit:
+ # run client and emit a signal
+ event = getattr(catalog, args.emit)
+ emit(event, args.content)
diff --git a/src/leap/common/events/catalog.py b/src/leap/common/events/catalog.py
new file mode 100644
index 0000000..8bddd2c
--- /dev/null
+++ b/src/leap/common/events/catalog.py
@@ -0,0 +1,88 @@
+# -*- coding: utf-8 -*-
+# catalog.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful",
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+Events catalog.
+"""
+
+
+EVENTS = [
+ "CLIENT_SESSION_ID",
+ "CLIENT_UID",
+ "IMAP_CLIENT_LOGIN",
+ "IMAP_SERVICE_FAILED_TO_START",
+ "IMAP_SERVICE_STARTED",
+ "IMAP_UNHANDLED_ERROR",
+ "KEYMANAGER_DONE_UPLOADING_KEYS",
+ "KEYMANAGER_FINISHED_KEY_GENERATION",
+ "KEYMANAGER_KEY_FOUND",
+ "KEYMANAGER_KEY_NOT_FOUND",
+ "KEYMANAGER_LOOKING_FOR_KEY",
+ "KEYMANAGER_STARTED_KEY_GENERATION",
+ "MAIL_FETCHED_INCOMING",
+ "MAIL_MSG_DECRYPTED",
+ "MAIL_MSG_DELETED_INCOMING",
+ "MAIL_MSG_PROCESSING",
+ "MAIL_MSG_SAVED_LOCALLY",
+ "MAIL_UNREAD_MESSAGES",
+ "RAISE_WINDOW",
+ "SMTP_CONNECTION_LOST",
+ "SMTP_END_ENCRYPT_AND_SIGN",
+ "SMTP_END_SIGN",
+ "SMTP_RECIPIENT_ACCEPTED_ENCRYPTED",
+ "SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED",
+ "SMTP_RECIPIENT_REJECTED",
+ "SMTP_SEND_MESSAGE_ERROR",
+ "SMTP_SEND_MESSAGE_START",
+ "SMTP_SEND_MESSAGE_SUCCESS",
+ "SMTP_SERVICE_FAILED_TO_START",
+ "SMTP_SERVICE_STARTED",
+ "SMTP_START_ENCRYPT_AND_SIGN",
+ "SMTP_START_SIGN",
+ "SOLEDAD_CREATING_KEYS",
+ "SOLEDAD_DONE_CREATING_KEYS",
+ "SOLEDAD_DONE_DATA_SYNC",
+ "SOLEDAD_DONE_DOWNLOADING_KEYS",
+ "SOLEDAD_DONE_UPLOADING_KEYS",
+ "SOLEDAD_DOWNLOADING_KEYS",
+ "SOLEDAD_INVALID_AUTH_TOKEN",
+ "SOLEDAD_NEW_DATA_TO_SYNC",
+ "SOLEDAD_SYNC_RECEIVE_STATUS",
+ "SOLEDAD_SYNC_SEND_STATUS",
+ "SOLEDAD_UPLOADING_KEYS",
+ "UPDATER_DONE_UPDATING",
+ "UPDATER_NEW_UPDATES",
+]
+
+
+class Event(object):
+
+ def __init__(self, label):
+ self.label = label
+
+ def __repr__(self):
+ return '<Event: %s>' % self.label
+
+ def __str__(self):
+ return self.label
+
+
+# create local variables based on the event list above
+lcl = locals()
+for event in EVENTS:
+ lcl[event] = Event(event)
diff --git a/src/leap/common/events/client.py b/src/leap/common/events/client.py
index 83f18e0..6b234a1 100644
--- a/src/leap/common/events/client.py
+++ b/src/leap/common/events/client.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# client.py
-# Copyright (C) 2013 LEAP
+# Copyright (C) 2013, 2014, 2015 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -14,6 +14,8 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
"""
The client end point of the events mechanism.
@@ -21,309 +23,501 @@ Clients are the communicating parties of the events mechanism. They
communicate by sending messages to a server, which in turn redistributes
messages to other clients.
-When a client registers a callback for a given signal, it also tells the
-server that it wants to be notified whenever signals of that type are sent by
+When a client registers a callback for a given event, it also tells the
+server that it wants to be notified whenever events of that type are sent by
some other client.
"""
-import logging
+import logging
+import collections
+import uuid
+import threading
+import time
+import pickle
+import os
+
+from abc import ABCMeta
+from abc import abstractmethod
+
+import zmq
+from zmq.eventloop import zmqstream
+from zmq.eventloop import ioloop
+
+# XXX some distros don't package libsodium, so we have to be prepared for
+# absence of zmq.auth
+try:
+ import zmq.auth
+except ImportError:
+ pass
-from protobuf.socketrpc import RpcService
+from leap.common.config import get_path_prefix
+from leap.common.zmq_utils import zmq_has_curve
+from leap.common.zmq_utils import maybe_create_and_get_certificates
+from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX
-from leap.common.events import events_pb2 as proto
-from leap.common.events import server
-from leap.common.events import daemon
-from leap.common.events import mac_auth
+from leap.common.events.errors import CallbackAlreadyRegisteredError
+from leap.common.events.server import EMIT_ADDR
+from leap.common.events.server import REG_ADDR
+from leap.common.events import catalog
logger = logging.getLogger(__name__)
-# the `registered_callbacks` dictionary below should have the following
-# format:
-#
-# { event_signal: [ (uid, callback), ... ], ... }
-#
-registered_callbacks = {}
+_emit_addr = EMIT_ADDR
+_reg_addr = REG_ADDR
+
+def configure_client(emit_addr, reg_addr):
+ global _emit_addr, _reg_addr
+ logger.debug("Configuring client with addresses: (%s, %s)" %
+ (emit_addr, reg_addr))
+ _emit_addr = emit_addr
+ _reg_addr = reg_addr
-class CallbackAlreadyRegistered(Exception):
+
+class EventsClient(object):
"""
- Raised when trying to register an already registered callback.
+ A singleton client for the events mechanism.
"""
- pass
+ __metaclass__ = ABCMeta
-def ensure_client_daemon():
- """
- Ensure the client daemon is running and listening for incoming
- messages.
+ _instance = None
+ _instance_lock = threading.Lock()
- :return: the daemon instance
- :rtype: EventsClientDaemon
- """
- import time
- daemon = EventsClientDaemon.ensure(0)
- logger.debug('ensure client daemon')
+ def __init__(self, emit_addr, reg_addr):
+ """
+ Initialize the events client.
+ """
+ logger.debug("Creating client instance.")
+ self._callbacks = collections.defaultdict(dict)
+ self._emit_addr = emit_addr
+ self._reg_addr = reg_addr
+
+ @property
+ def callbacks(self):
+ return self._callbacks
- # Because we use a random port we want to wait until a port is assigned to
- # local client daemon.
+ @classmethod
+ def instance(cls):
+ """
+ Return a singleton EventsClient instance.
+ """
+ with cls._instance_lock:
+ if cls._instance is None:
+ cls._instance = cls(_emit_addr, _reg_addr)
+ return cls._instance
- while not (EventsClientDaemon.get_instance() and
- EventsClientDaemon.get_instance().get_port()):
- time.sleep(0.1)
- return daemon
+ def register(self, event, callback, uid=None, replace=False):
+ """
+ Register a callback to be executed when an event is received.
+
+ :param event: The event that triggers the callback.
+ :type event: Event
+ :param callback: The callback to be executed.
+ :type callback: callable(event, *content)
+ :param uid: The callback uid.
+ :type uid: str
+ :param replace: Wether an eventual callback with same ID should be
+ replaced.
+ :type replace: bool
+
+ :return: The callback uid.
+ :rtype: str
+
+ :raises CallbackAlreadyRegisteredError: when there's already a callback
+ identified by the given uid and replace is False.
+ """
+ logger.debug("Subscribing to event: %s" % event)
+ if not uid:
+ uid = uuid.uuid4()
+ elif uid in self._callbacks[event] and not replace:
+ raise CallbackAlreadyRegisteredError()
+ self._callbacks[event][uid] = callback
+ self._subscribe(str(event))
+ return uid
+
+ def unregister(self, event, uid=None):
+ """
+ Unregister callbacks for an event.
+ If uid is not None, then only the callback identified by the given uid
+ is removed. Otherwise, all callbacks for the event are removed.
-def register(signal, callback, uid=None, replace=False, reqcbk=None,
- timeout=1000):
- """
- Registers a callback to be called when a specific signal event is
- received.
-
- Will timeout after timeout ms if response has not been received. The
- timeout arg is only used for asynch requests. If a reqcbk callback has
- been supplied the timeout arg is not used. The response value will be
- returned for a synch request but nothing will be returned for an asynch
- request.
-
- :param signal: the signal that causes the callback to be launched
- :type signal: int (see the `events.proto` file)
- :param callback: the callback to be called when the signal is received
- :type callback: function(leap.common.events.events_pb2.SignalRequest)
- :param uid: a unique id for the callback
- :type uid: int
- :param replace: should an existent callback with same uid be replaced?
- :type replace: bool
- :param reqcbk: a callback to be called when a response from server is
- received
- :type reqcbk: function(proto.RegisterRequest, proto.EventResponse)
- :param timeout: the timeout for synch calls
- :type timeout: int
-
- Might raise a CallbackAlreadyRegistered exception if there's already a
- callback identified by the given uid and replace is False.
-
- :return: the response from server for synch calls or nothing for asynch
- calls.
- :rtype: leap.common.events.events_pb2.EventsResponse or None
- """
- ensure_client_daemon() # so we can receive registered signals
- # register callback locally
- if signal not in registered_callbacks:
- registered_callbacks[signal] = []
- cbklist = registered_callbacks[signal]
-
- # TODO should check that the callback has the right
- # number of arguments.
-
- if uid and filter(lambda (x, y): x == uid, cbklist):
- if not replace:
- raise CallbackAlreadyRegistered()
+ :param event: The event that triggers the callback.
+ :type event: Event
+ :param uid: The callback uid.
+ :type uid: str
+ """
+ if not uid:
+ logger.debug(
+ "Unregistering all callbacks from event %s." % event)
+ self._callbacks[event] = {}
else:
- registered_callbacks[signal] = filter(lambda(x, y): x != uid,
- cbklist)
- registered_callbacks[signal].append((uid, callback))
- # register callback on server
- request = proto.RegisterRequest()
- request.event = signal
- request.port = EventsClientDaemon.get_instance().get_port()
- request.mac_method = mac_auth.MacMethod.MAC_NONE
- request.mac = ""
- service = RpcService(proto.EventsServerService_Stub,
- server.SERVER_PORT, 'localhost')
- logger.debug(
- "Sending registration request to server on port %s: %s",
- server.SERVER_PORT,
- str(request)[:40])
- return service.register(request, callback=reqcbk, timeout=timeout)
-
-
-def unregister(signal, uid=None, reqcbk=None, timeout=1000):
- """
- Unregister a callback.
-
- If C{uid} is specified, unregisters only the callback identified by that
- unique id. Otherwise, unregisters all callbacks
-
- :param signal: the signal that causes the callback to be launched
- :type signal: int (see the `events.proto` file)
- :param uid: a unique id for the callback
- :type uid: int
- :param reqcbk: a callback to be called when a response from server is
- received
- :type reqcbk: function(proto.UnregisterRequest, proto.EventResponse)
- :param timeout: the timeout for synch calls
- :type timeout: int
-
- :return: the response from server for synch calls or nothing for asynch
- calls or None if no callback is registered for that signal or
- uid.
- :rtype: leap.common.events.events_pb2.EventsResponse or None
- """
- if signal not in registered_callbacks or not registered_callbacks[signal]:
- logger.warning("No callback registered for signal %d." % signal)
- return None
- # unregister callback locally
- cbklist = registered_callbacks[signal]
- if uid is not None:
- if filter(lambda (cbkuid, _): cbkuid == uid, cbklist) == []:
- logger.warning("No callback registered for uid %d." % st)
- return None
- registered_callbacks[signal] = filter(lambda(x, y): x != uid, cbklist)
- else:
- # exclude all callbacks for given signal
- registered_callbacks[signal] = []
- # unregister port in server if there are no more callbacks for this signal
- if not registered_callbacks[signal]:
- request = proto.UnregisterRequest()
- request.event = signal
- request.port = EventsClientDaemon.get_instance().get_port()
- request.mac_method = mac_auth.MacMethod.MAC_NONE
- request.mac = ""
- service = RpcService(proto.EventsServerService_Stub,
- server.SERVER_PORT, 'localhost')
- logger.info(
- "Sending unregistration request to server on port %s: %s",
- server.SERVER_PORT,
- str(request)[:40])
- return service.unregister(request, callback=reqcbk, timeout=timeout)
-
-
-def signal(signal, content="", mac_method="", mac="", reqcbk=None,
- timeout=1000):
- """
- Send `signal` event to events server.
-
- Will timeout after timeout ms if response has not been received. The
- timeout arg is only used for asynch requests. If a reqcbk callback has
- been supplied the timeout arg is not used. The response value will be
- returned for a synch request but nothing will be returned for an asynch
- request.
-
- :param signal: the signal that causes the callback to be launched
- :type signal: int (see the `events.proto` file)
- :param content: the contents of the event signal
- :type content: str
- :param mac_method: the method used for auth mac
- :type mac_method: str
- :param mac: the content of the auth mac
- :type mac: str
- :param reqcbk: a callback to be called when a response from server is
- received
- :type reqcbk: function(proto.SignalRequest, proto.EventResponse)
- :param timeout: the timeout for synch calls
- :type timeout: int
-
- :return: the response from server for synch calls or nothing for asynch
- calls.
- :rtype: leap.common.events.events_pb2.EventsResponse or None
- """
- request = proto.SignalRequest()
- request.event = signal
- request.content = content
- request.mac_method = mac_method
- request.mac = mac
- service = RpcService(proto.EventsServerService_Stub, server.SERVER_PORT,
- 'localhost')
- logger.debug("Sending signal to server: %s", str(request)[:40])
- return service.signal(request, callback=reqcbk, timeout=timeout)
-
-
-def ping(port, reqcbk=None, timeout=1000):
+ logger.debug(
+ "Unregistering callback %s from event %s." % (uid, event))
+ if uid in self._callbacks[event]:
+ del self._callbacks[event][uid]
+ if not self._callbacks[event]:
+ del self._callbacks[event]
+ self._unsubscribe(str(event))
+
+ def emit(self, event, *content):
+ """
+ Send an event.
+
+ :param event: The event to be sent.
+ :type event: Event
+ :param content: The content of the event.
+ :type content: list
+ """
+ logger.debug("Sending event: (%s, %s)" % (event, content))
+ self._send(str(event) + b'\0' + pickle.dumps(content))
+
+ def _handle_event(self, event, content):
+ """
+ Handle an incoming event.
+
+ :param msg: The incoming message.
+ :type msg: list(str)
+ """
+ logger.debug("Handling event %s..." % event)
+ for uid in self._callbacks[event]:
+ callback = self._callbacks[event][uid]
+ logger.debug("Executing callback %s." % uid)
+ callback(event, *content)
+
+ @abstractmethod
+ def _subscribe(self, tag):
+ """
+ Subscribe to a tag on the zmq SUB socket.
+
+ :param tag: The tag to be subscribed.
+ :type tag: str
+ """
+ pass
+
+ @abstractmethod
+ def _unsubscribe(self, tag):
+ """
+ Unsubscribe from a tag on the zmq SUB socket.
+
+ :param tag: The tag to be unsubscribed.
+ :type tag: str
+ """
+ pass
+
+ @abstractmethod
+ def _send(self, data):
+ """
+ Send data through PUSH socket.
+
+ :param data: The data to be sent.
+ :type event: str
+ """
+ pass
+
+ def shutdown(self):
+ self.__class__.reset()
+
+ @classmethod
+ def reset(cls):
+ with cls._instance_lock:
+ cls._instance = None
+
+
+class EventsIOLoop(ioloop.ZMQIOLoop):
"""
- Ping a client running in C{port}.
-
- :param port: the port in which the client should be listening
- :type port: int
- :param reqcbk: a callback to be called when a response from client is
- received
- :type reqcbk: function(proto.PingRequest, proto.EventResponse)
- :param timeout: the timeout for synch calls
- :type timeout: int
-
- :return: the response from client for synch calls or nothing for asynch
- calls.
- :rtype: leap.common.events.events_pb2.EventsResponse or None
+ An extension of zmq's ioloop that can wait until there are no callbacks
+ in the queue before stopping.
"""
- request = proto.PingRequest()
- service = RpcService(
- proto.EventsClientService_Stub,
- port,
- 'localhost')
- logger.debug("Pinging a client in port %d..." % port)
- return service.ping(request, callback=reqcbk, timeout=timeout)
+ def stop(self, wait=False):
+ """
+ Stop the I/O loop.
-class EventsClientService(proto.EventsClientService):
+ :param wait: Whether we should wait for callbacks in queue to finish
+ before stopping.
+ :type wait: bool
+ """
+ if wait:
+ # prevent new callbacks from being added
+ with self._callback_lock:
+ self._closing = True
+ # wait until all callbacks have been executed
+ while self._callbacks:
+ time.sleep(0.1)
+ ioloop.ZMQIOLoop.stop(self)
+
+
+class EventsClientThread(threading.Thread, EventsClient):
"""
- Service for receiving signal events in clients.
+ A threaded version of the events client.
"""
- def __init__(self):
- proto.EventsClientService.__init__(self)
+ def __init__(self, emit_addr, reg_addr):
+ """
+ Initialize the events client.
+ """
+ threading.Thread.__init__(self)
+ EventsClient.__init__(self, emit_addr, reg_addr)
+ self._lock = threading.Lock()
+ self._initialized = threading.Event()
+ self._config_prefix = os.path.join(
+ get_path_prefix(), "leap", "events")
+ self._loop = None
+ self._context = None
+ self._push = None
+ self._sub = None
+
+ def _init_zmq(self):
+ """
+ Initialize ZMQ connections.
+ """
+ self._loop = EventsIOLoop()
+ self._context = zmq.Context()
+ # connect SUB first, otherwise we might miss some event sent from this
+ # same client
+ self._sub = self._zmq_connect_sub()
+ self._push = self._zmq_connect_push()
+
+ def _zmq_connect(self, socktype, address):
+ """
+ Connect to an address using with a zmq socktype.
+
+ :param socktype: The ZMQ socket type.
+ :type socktype: int
+ :param address: The address to connect to.
+ :type address: str
- def signal(self, controller, request, done):
+ :return: A ZMQ connection stream.
+ :rtype: ZMQStream
"""
- Receive a signal and run callbacks registered for that signal.
+ logger.debug("Connecting %s to %s." % (socktype, address))
+ socket = self._context.socket(socktype)
+ # configure curve authentication
+ if zmq_has_curve():
+ public, private = maybe_create_and_get_certificates(
+ self._config_prefix, "client")
+ server_public_file = os.path.join(
+ self._config_prefix, PUBLIC_KEYS_PREFIX, "server.key")
+ server_public, _ = zmq.auth.load_certificate(server_public_file)
+ socket.curve_publickey = public
+ socket.curve_secretkey = private
+ socket.curve_serverkey = server_public
+ stream = zmqstream.ZMQStream(socket, self._loop)
+ socket.connect(address)
+ return stream
+
+ def _zmq_connect_push(self):
+ """
+ Initialize the client's PUSH connection.
- This method is called whenever a signal request is received from
- server.
+ :return: A ZMQ connection stream.
+ :rtype: ZMQStream
+ """
+ return self._zmq_connect(zmq.PUSH, self._emit_addr)
- :param controller: used to mediate a single method call
- :type controller: protobuf.socketrpc.controller.SocketRpcController
- :param request: the request received from the client
- :type request: leap.common.events.events_pb2.SignalRequest
- :param done: callback to be called when done
- :type done: protobuf.socketrpc.server.Callback
+ def _zmq_connect_sub(self):
"""
- logger.debug('Received signal from server: %s...' % str(request)[:40])
+ Initialize the client's SUB connection.
- # run registered callbacks
- # TODO: verify authentication using mac in incoming message
- if request.event in registered_callbacks:
- for (_, cbk) in registered_callbacks[request.event]:
- # callbacks should be prepared to receive a
- # events_pb2.SignalRequest.
- cbk(request)
+ :return: A ZMQ connection stream.
+ :rtype: ZMQStream
+ """
+ stream = self._zmq_connect(zmq.SUB, self._reg_addr)
+ stream.on_recv(self._on_recv)
+ return stream
- # send response back to server
- response = proto.EventResponse()
- response.status = proto.EventResponse.OK
- done.run(response)
+ def _on_recv(self, msg):
+ """
+ Handle an incoming message in the SUB socket.
- def ping(self, controller, request, done):
+ :param msg: The received message.
+ :type msg: str
"""
- Reply to a ping request.
+ ev_str, content_pickle = msg[0].split(b'\0', 1) # undo txzmq tagging
+ event = getattr(catalog, ev_str)
+ content = pickle.loads(content_pickle)
+ self._handle_event(event, content)
- :param controller: used to mediate a single method call
- :type controller: protobuf.socketrpc.controller.SocketRpcController
- :param request: the request received from the client
- :type request: leap.common.events.events_pb2.RegisterRequest
- :param done: callback to be called when done
- :type done: protobuf.socketrpc.server.Callback
+ def _subscribe(self, tag):
"""
- logger.debug("Received ping request, sending response.")
- response = proto.EventResponse()
- response.status = proto.EventResponse.OK
- done.run(response)
+ Subscribe from a tag on the zmq SUB socket.
+ :param tag: The tag to be subscribed.
+ :type tag: str
+ """
+ self._sub.socket.setsockopt(zmq.SUBSCRIBE, tag)
-class EventsClientDaemon(daemon.EventsSingletonDaemon):
- """
- A daemon that listens for incoming events from server.
- """
- @classmethod
- def ensure(cls, port):
+ def _unsubscribe(self, tag):
+ """
+ Unsubscribe from a tag on the zmq SUB socket.
+
+ :param tag: The tag to be unsubscribed.
+ :type tag: str
+ """
+ self._sub.socket.setsockopt(zmq.UNSUBSCRIBE, tag)
+
+ def _send(self, data):
+ """
+ Send data through PUSH socket.
+
+ :param data: The data to be sent.
+ :type event: str
+ """
+ logger.debug("Sending data: %s" % data)
+ # add send() as a callback for ioloop so it works between threads
+ self._loop.add_callback(lambda: self._push.send(data))
+
+ def register(self, event, callback, uid=None, replace=False):
+ """
+ Register a callback to be executed when an event is received.
+
+ :param event: The event that triggers the callback.
+ :type event: Event
+ :param callback: The callback to be executed.
+ :type callback: callable(event, *content)
+ :param uid: The callback uid.
+ :type uid: str
+ :param replace: Wether an eventual callback with same ID should be
+ replaced.
+ :type replace: bool
+
+ :return: The callback uid.
+ :rtype: str
+
+ :raises CallbackAlreadyRegisteredError: when there's already a
+ callback identified by the given uid and replace is False.
"""
- Make sure the daemon is running on the given port.
+ self.ensure_client()
+ return EventsClient.register(self, event, callback, uid=uid, replace=replace)
- :param port: the port in which the daemon should listen
- :type port: int
+ def unregister(self, event, uid=None):
+ """
+ Unregister callbacks for an event.
+
+ If uid is not None, then only the callback identified by the given uid
+ is removed. Otherwise, all callbacks for the event are removed.
+
+ :param event: The event that triggers the callback.
+ :type event: Event
+ :param uid: The callback uid.
+ :type uid: str
+ """
+ self.ensure_client()
+ EventsClient.unregister(self, event, uid=uid)
+
+ def emit(self, event, *content):
+ """
+ Send an event.
+
+ :param event: The event to be sent.
+ :type event: Event
+ :param content: The content of the event.
+ :type content: list
+ """
+ self.ensure_client()
+ EventsClient.emit(self, event, *content)
+
+ def run(self):
+ """
+ Run the events client.
+ """
+ logger.debug("Starting ioloop.")
+ self._init_zmq()
+ self._initialized.set()
+ self._loop.start()
+ self._loop.close()
+ logger.debug("Ioloop finished.")
+
+ def ensure_client(self):
+ """
+ Make sure the events client thread is started.
+ """
+ with self._lock:
+ if not self.is_alive():
+ self.daemon = True
+ self.start()
+ self._initialized.wait()
- :return: a daemon instance
- :rtype: EventsClientDaemon
+ def shutdown(self):
"""
- return cls.ensure_service(port, EventsClientService())
+ Shutdown the events client thread.
+ """
+ logger.debug("Shutting down client...")
+ with self._lock:
+ if self.is_alive():
+ self._loop.stop(wait=True)
+ EventsClient.shutdown(self)
+
+
+def shutdown():
+ """
+ Shutdown the events client thread.
+ """
+ EventsClientThread.instance().shutdown()
+
+
+def register(event, callback, uid=None, replace=False):
+ """
+ Register a callback to be executed when an event is received.
+
+ :param event: The event that triggers the callback.
+ :type event: str
+ :param callback: The callback to be executed.
+ :type callback: callable(event, content)
+ :param uid: The callback uid.
+ :type uid: str
+ :param replace: Wether an eventual callback with same ID should be
+ replaced.
+ :type replace: bool
+
+ :return: The callback uid.
+ :rtype: str
+
+ :raises CallbackAlreadyRegisteredError: when there's already a callback
+ identified by the given uid and replace is False.
+ """
+ return EventsClientThread.instance().register(
+ event, callback, uid=uid, replace=replace)
+
+
+def unregister(event, uid=None):
+ """
+ Unregister callbacks for an event.
+
+ If uid is not None, then only the callback identified by the given uid is
+ removed. Otherwise, all callbacks for the event are removed.
+
+ :param event: The event that triggers the callback.
+ :type event: str
+ :param uid: The callback uid.
+ :type uid: str
+ """
+ return EventsClientThread.instance().unregister(event, uid=uid)
+
+
+def emit(event, *content):
+ """
+ Send an event.
+
+ :param event: The event to be sent.
+ :type event: str
+ :param content: The content of the event.
+ :type content: list
+ """
+ return EventsClientThread.instance().emit(event, *content)
+
+
+def instance():
+ """
+ Return an instance of the events client.
+
+ :return: An instance of the events client.
+ :rtype: EventsClientThread
+ """
+ return EventsClientThread.instance()
diff --git a/src/leap/common/events/daemon.py b/src/leap/common/events/daemon.py
deleted file mode 100644
index c4a4189..0000000
--- a/src/leap/common/events/daemon.py
+++ /dev/null
@@ -1,208 +0,0 @@
-# -*- coding: utf-8 -*-
-# daemon.py
-# Copyright (C) 2013 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-A singleton daemon for running RPC services using protobuf.socketrpc.
-"""
-
-
-import logging
-import threading
-
-
-from protobuf.socketrpc.server import (
- SocketRpcServer,
- ThreadedTCPServer,
- SocketHandler,
-)
-
-
-logger = logging.getLogger(__name__)
-
-
-class ServiceAlreadyRunningException(Exception):
- """
- Raised whenever a service is already running in this process but someone
- attemped to start it in a different port.
- """
-
-
-class EventsRpcServer(SocketRpcServer):
- """
- RPC server used in server and client interfaces to receive messages.
- """
-
- def __init__(self, port, host='localhost'):
- """
- Initialize a RPC server.
-
- :param port: the port in which to listen for incoming messages
- :type port: int
- :param host: the address to bind to
- :type host: str
- """
- SocketRpcServer.__init__(self, port, host)
- self._server = None
-
- def run(self):
- """
- Run the server.
- """
- logger.info('Running server on port %d.' % self.port)
- # parent implementation does not hold the server instance, so we do it
- # here.
- self._server = ThreadedTCPServer((self.host, self.port),
- SocketHandler, self)
- # if we chose to use a random port, fetch the port number info.
- if self.port is 0:
- self.port = self._server.socket.getsockname()[1]
- self._server.serve_forever()
-
- def stop(self):
- """
- Stop the server.
- """
- self._server.shutdown()
-
-
-class EventsSingletonDaemon(threading.Thread):
- """
- Singleton class for for launching and terminating a daemon.
-
- This class is used so every part of the mechanism that needs to listen for
- messages can launch its own daemon (thread) to do the job.
- """
-
- # Singleton instance
- __instance = None
-
- def __new__(cls, *args, **kwargs):
- """
- Return a singleton instance if it exists or create and initialize one.
- """
- if len(args) is not 2:
- raise TypeError("__init__() takes exactly 2 arguments (%d given)"
- % len(args))
- if cls.__instance is None:
- cls.__instance = object.__new__(
- EventsSingletonDaemon)
- cls.__initialize(cls.__instance, args[0], args[1])
- return cls.__instance
-
- @staticmethod
- def __initialize(self, port, service):
- """
- Initialize a singleton daemon.
-
- This is a static method disguised as instance method that actually
- does the initialization of the daemon instance.
-
- :param port: the port in which to listen for incoming messages
- :type port: int
- :param service: the service to provide in this daemon
- :type service: google.protobuf.service.Service
- """
- threading.Thread.__init__(self)
- self._port = port
- self._service = service
- self._server = EventsRpcServer(self._port)
- self._server.registerService(self._service)
- self.daemon = True
-
- def __init__(self):
- """
- Singleton placeholder initialization method.
-
- Initialization is made in __new__ so we can always return the same
- instance upon object creation.
- """
- pass
-
- @classmethod
- def ensure(cls, port):
- """
- Make sure the daemon instance is running.
-
- Each implementation of this method should call `self.ensure_service`
- with the appropriate service from the `events.proto` definitions, and
- return the daemon instance.
-
- :param port: the port in which the daemon should be listening
- :type port: int
-
- :return: a daemon instance
- :rtype: EventsSingletonDaemon
- """
- raise NotImplementedError(self.ensure)
-
- @classmethod
- def ensure_service(cls, port, service):
- """
- Start the singleton instance if not already running.
-
- Might return ServiceAlreadyRunningException
-
- :param port: the port in which the daemon should be listening
- :type port: int
-
- :return: a daemon instance
- :rtype: EventsSingletonDaemon
- """
- daemon = cls(port, service)
- if not daemon.is_alive():
- daemon.start()
- elif port and port != cls.__instance._port:
- # service is running in this process but someone is trying to
- # start it in another port
- raise ServiceAlreadyRunningException(
- "Service is already running in this process on port %d."
- % self.__instance._port)
- return daemon
-
- @classmethod
- def get_instance(cls):
- """
- Retrieve singleton instance of this daemon.
-
- :return: a daemon instance
- :rtype: EventsSingletonDaemon
- """
- return cls.__instance
-
- def run(self):
- """
- Run the server.
- """
- self._server.run()
-
- def stop(self):
- """
- Stop the daemon.
- """
- self._server.stop()
-
- def get_port(self):
- """
- Retrieve the value of the port to which the service running in this
- daemon is binded to.
-
- :return: the port to which the daemon is binded to
- :rtype: int
- """
- if self._port is 0:
- self._port = self._server.port
- return self._port
diff --git a/src/leap/common/events/mac_auth.py b/src/leap/common/events/errors.py
index 49d48f7..58e0014 100644
--- a/src/leap/common/events/mac_auth.py
+++ b/src/leap/common/events/errors.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
-# mac_auth.py
-# Copyright (C) 2013 LEAP
+# errors.py
+# Copyright (C) 2015 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -15,17 +15,9 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Authentication system for events.
-This is not implemented yet.
-"""
-
-
-class MacMethod(object):
+class CallbackAlreadyRegisteredError(Exception):
"""
- Representation of possible MAC authentication methods.
+ Raised when trying to register an already registered callback.
"""
-
- MAC_NONE = 'none'
- MAC_HMAC = 'hmac'
+ pass
diff --git a/src/leap/common/events/events.proto b/src/leap/common/events/events.proto
deleted file mode 100644
index 2371b2a..0000000
--- a/src/leap/common/events/events.proto
+++ /dev/null
@@ -1,145 +0,0 @@
-// signal.proto
-// Copyright (C) 2013 LEAP
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-//
-// You should have received a copy of the GNU General Public License
-// along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-package leap.common.events;
-option py_generic_services = true;
-
-
-// These are the events that can be signaled using the events mechanism.
-
-enum Event {
- CLIENT_SESSION_ID = 1;
- CLIENT_UID = 2;
- SOLEDAD_CREATING_KEYS = 3;
- SOLEDAD_DONE_CREATING_KEYS = 4;
- SOLEDAD_UPLOADING_KEYS = 5;
- SOLEDAD_DONE_UPLOADING_KEYS = 6;
- SOLEDAD_DOWNLOADING_KEYS = 7;
- SOLEDAD_DONE_DOWNLOADING_KEYS = 8;
- SOLEDAD_NEW_DATA_TO_SYNC = 9;
- SOLEDAD_DONE_DATA_SYNC = 10;
- UPDATER_NEW_UPDATES = 11;
- UPDATER_DONE_UPDATING = 12;
- RAISE_WINDOW = 13;
- SMTP_SERVICE_STARTED = 14;
- SMTP_SERVICE_FAILED_TO_START = 15;
- SMTP_RECIPIENT_ACCEPTED_ENCRYPTED = 16;
- SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED = 17;
- SMTP_RECIPIENT_REJECTED = 18;
- SMTP_START_ENCRYPT_AND_SIGN = 19;
- SMTP_END_ENCRYPT_AND_SIGN = 20;
- SMTP_START_SIGN = 21;
- SMTP_END_SIGN = 22;
- SMTP_SEND_MESSAGE_START = 23;
- SMTP_SEND_MESSAGE_SUCCESS = 24;
- SMTP_SEND_MESSAGE_ERROR = 25;
- SMTP_CONNECTION_LOST = 26;
- IMAP_SERVICE_STARTED = 30;
- IMAP_SERVICE_FAILED_TO_START = 31;
- IMAP_CLIENT_LOGIN = 32;
- IMAP_FETCHED_INCOMING = 33;
- IMAP_MSG_PROCESSING = 34;
- IMAP_MSG_DECRYPTED = 35;
- IMAP_MSG_SAVED_LOCALLY = 36;
- IMAP_MSG_DELETED_INCOMING = 37;
- IMAP_UNHANDLED_ERROR = 38;
- IMAP_UNREAD_MAIL = 39;
- KEYMANAGER_LOOKING_FOR_KEY = 40;
- KEYMANAGER_KEY_FOUND = 41;
- KEYMANAGER_KEY_NOT_FOUND = 42;
- KEYMANAGER_STARTED_KEY_GENERATION = 43;
- KEYMANAGER_FINISHED_KEY_GENERATION = 44;
- KEYMANAGER_DONE_UPLOADING_KEYS = 45;
- SOLEDAD_INVALID_AUTH_TOKEN = 46;
- SOLEDAD_SYNC_SEND_STATUS = 47;
- SOLEDAD_SYNC_RECEIVE_STATUS = 48;
-}
-
-
-// A SignalRequest is the type of the message sent from one component to request
-// that a signal be sent to every registered component.
-
-message SignalRequest {
- required Event event = 1;
- required string content = 2;
- required string mac_method = 3;
- required bytes mac = 4;
- optional string enc_method = 5;
- optional bool error_occurred = 6;
-}
-
-
-// A RegisterRequest message tells the server that a component wants to
-// be signaled whenever a specific event occurs.
-
-message RegisterRequest {
- required Event event = 1;
- required int32 port = 2;
- required string mac_method = 3;
- required bytes mac = 4;
-}
-
-
-// An UnregisterRequest message tells the server that a component does not
-// want to be signaled when a specific event occurs.
-
-message UnregisterRequest {
- required Event event = 1;
- required int32 port = 2;
- required string mac_method = 3;
- required bytes mac = 4;
-}
-
-
-// A PingRequest message is used to find out if a server or component is
-// alive.
-
-message PingRequest {
-}
-
-
-// The EventResponse is the message sent back by server and components after
-// they receive other kinds of requests.
-
-message EventResponse {
-
- enum Status {
- OK = 1;
- UNAUTH = 2;
- ERROR = 3;
- }
-
- required Status status = 1;
- optional string result = 2;
-}
-
-
-// The EventsServerService is the service provided by the server.
-
-service EventsServerService {
- rpc ping(PingRequest) returns (EventResponse);
- rpc register(RegisterRequest) returns (EventResponse);
- rpc unregister(UnregisterRequest) returns (EventResponse);
- rpc signal(SignalRequest) returns (EventResponse);
-}
-
-
-// EventsComponentService is the service provided by components (clients).
-
-service EventsClientService {
- rpc ping(PingRequest) returns (EventResponse);
- rpc signal(SignalRequest) returns (EventResponse);
-}
diff --git a/src/leap/common/events/events_pb2.py b/src/leap/common/events/events_pb2.py
deleted file mode 100644
index 9692ea1..0000000
--- a/src/leap/common/events/events_pb2.py
+++ /dev/null
@@ -1,635 +0,0 @@
-# Generated by the protocol buffer compiler. DO NOT EDIT!
-
-from google.protobuf import descriptor
-from google.protobuf import message
-from google.protobuf import reflection
-from google.protobuf import service
-from google.protobuf import service_reflection
-from google.protobuf import descriptor_pb2
-# @@protoc_insertion_point(imports)
-
-
-
-DESCRIPTOR = descriptor.FileDescriptor(
- name='events.proto',
- package='leap.common.events',
- serialized_pb='\n\x0c\x65vents.proto\x12\x12leap.common.events\"\x97\x01\n\rSignalRequest\x12(\n\x05\x65vent\x18\x01 \x02(\x0e\x32\x19.leap.common.events.Event\x12\x0f\n\x07\x63ontent\x18\x02 \x02(\t\x12\x12\n\nmac_method\x18\x03 \x02(\t\x12\x0b\n\x03mac\x18\x04 \x02(\x0c\x12\x12\n\nenc_method\x18\x05 \x01(\t\x12\x16\n\x0e\x65rror_occurred\x18\x06 \x01(\x08\"j\n\x0fRegisterRequest\x12(\n\x05\x65vent\x18\x01 \x02(\x0e\x32\x19.leap.common.events.Event\x12\x0c\n\x04port\x18\x02 \x02(\x05\x12\x12\n\nmac_method\x18\x03 \x02(\t\x12\x0b\n\x03mac\x18\x04 \x02(\x0c\"l\n\x11UnregisterRequest\x12(\n\x05\x65vent\x18\x01 \x02(\x0e\x32\x19.leap.common.events.Event\x12\x0c\n\x04port\x18\x02 \x02(\x05\x12\x12\n\nmac_method\x18\x03 \x02(\t\x12\x0b\n\x03mac\x18\x04 \x02(\x0c\"\r\n\x0bPingRequest\"\x82\x01\n\rEventResponse\x12\x38\n\x06status\x18\x01 \x02(\x0e\x32(.leap.common.events.EventResponse.Status\x12\x0e\n\x06result\x18\x02 \x01(\t\"\'\n\x06Status\x12\x06\n\x02OK\x10\x01\x12\n\n\x06UNAUTH\x10\x02\x12\t\n\x05\x45RROR\x10\x03*\x9f\n\n\x05\x45vent\x12\x15\n\x11\x43LIENT_SESSION_ID\x10\x01\x12\x0e\n\nCLIENT_UID\x10\x02\x12\x19\n\x15SOLEDAD_CREATING_KEYS\x10\x03\x12\x1e\n\x1aSOLEDAD_DONE_CREATING_KEYS\x10\x04\x12\x1a\n\x16SOLEDAD_UPLOADING_KEYS\x10\x05\x12\x1f\n\x1bSOLEDAD_DONE_UPLOADING_KEYS\x10\x06\x12\x1c\n\x18SOLEDAD_DOWNLOADING_KEYS\x10\x07\x12!\n\x1dSOLEDAD_DONE_DOWNLOADING_KEYS\x10\x08\x12\x1c\n\x18SOLEDAD_NEW_DATA_TO_SYNC\x10\t\x12\x1a\n\x16SOLEDAD_DONE_DATA_SYNC\x10\n\x12\x17\n\x13UPDATER_NEW_UPDATES\x10\x0b\x12\x19\n\x15UPDATER_DONE_UPDATING\x10\x0c\x12\x10\n\x0cRAISE_WINDOW\x10\r\x12\x18\n\x14SMTP_SERVICE_STARTED\x10\x0e\x12 \n\x1cSMTP_SERVICE_FAILED_TO_START\x10\x0f\x12%\n!SMTP_RECIPIENT_ACCEPTED_ENCRYPTED\x10\x10\x12\'\n#SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED\x10\x11\x12\x1b\n\x17SMTP_RECIPIENT_REJECTED\x10\x12\x12\x1f\n\x1bSMTP_START_ENCRYPT_AND_SIGN\x10\x13\x12\x1d\n\x19SMTP_END_ENCRYPT_AND_SIGN\x10\x14\x12\x13\n\x0fSMTP_START_SIGN\x10\x15\x12\x11\n\rSMTP_END_SIGN\x10\x16\x12\x1b\n\x17SMTP_SEND_MESSAGE_START\x10\x17\x12\x1d\n\x19SMTP_SEND_MESSAGE_SUCCESS\x10\x18\x12\x1b\n\x17SMTP_SEND_MESSAGE_ERROR\x10\x19\x12\x18\n\x14SMTP_CONNECTION_LOST\x10\x1a\x12\x18\n\x14IMAP_SERVICE_STARTED\x10\x1e\x12 \n\x1cIMAP_SERVICE_FAILED_TO_START\x10\x1f\x12\x15\n\x11IMAP_CLIENT_LOGIN\x10 \x12\x19\n\x15IMAP_FETCHED_INCOMING\x10!\x12\x17\n\x13IMAP_MSG_PROCESSING\x10\"\x12\x16\n\x12IMAP_MSG_DECRYPTED\x10#\x12\x1a\n\x16IMAP_MSG_SAVED_LOCALLY\x10$\x12\x1d\n\x19IMAP_MSG_DELETED_INCOMING\x10%\x12\x18\n\x14IMAP_UNHANDLED_ERROR\x10&\x12\x14\n\x10IMAP_UNREAD_MAIL\x10\'\x12\x1e\n\x1aKEYMANAGER_LOOKING_FOR_KEY\x10(\x12\x18\n\x14KEYMANAGER_KEY_FOUND\x10)\x12\x1c\n\x18KEYMANAGER_KEY_NOT_FOUND\x10*\x12%\n!KEYMANAGER_STARTED_KEY_GENERATION\x10+\x12&\n\"KEYMANAGER_FINISHED_KEY_GENERATION\x10,\x12\"\n\x1eKEYMANAGER_DONE_UPLOADING_KEYS\x10-\x12\x1e\n\x1aSOLEDAD_INVALID_AUTH_TOKEN\x10.\x12\x1c\n\x18SOLEDAD_SYNC_SEND_STATUS\x10/\x12\x1f\n\x1bSOLEDAD_SYNC_RECEIVE_STATUS\x10\x30\x32\xdd\x02\n\x13\x45ventsServerService\x12J\n\x04ping\x12\x1f.leap.common.events.PingRequest\x1a!.leap.common.events.EventResponse\x12R\n\x08register\x12#.leap.common.events.RegisterRequest\x1a!.leap.common.events.EventResponse\x12V\n\nunregister\x12%.leap.common.events.UnregisterRequest\x1a!.leap.common.events.EventResponse\x12N\n\x06signal\x12!.leap.common.events.SignalRequest\x1a!.leap.common.events.EventResponse2\xb1\x01\n\x13\x45ventsClientService\x12J\n\x04ping\x12\x1f.leap.common.events.PingRequest\x1a!.leap.common.events.EventResponse\x12N\n\x06signal\x12!.leap.common.events.SignalRequest\x1a!.leap.common.events.EventResponseB\x03\x90\x01\x01')
-
-_EVENT = descriptor.EnumDescriptor(
- name='Event',
- full_name='leap.common.events.Event',
- filename=None,
- file=DESCRIPTOR,
- values=[
- descriptor.EnumValueDescriptor(
- name='CLIENT_SESSION_ID', index=0, number=1,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='CLIENT_UID', index=1, number=2,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SOLEDAD_CREATING_KEYS', index=2, number=3,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SOLEDAD_DONE_CREATING_KEYS', index=3, number=4,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SOLEDAD_UPLOADING_KEYS', index=4, number=5,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SOLEDAD_DONE_UPLOADING_KEYS', index=5, number=6,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SOLEDAD_DOWNLOADING_KEYS', index=6, number=7,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SOLEDAD_DONE_DOWNLOADING_KEYS', index=7, number=8,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SOLEDAD_NEW_DATA_TO_SYNC', index=8, number=9,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SOLEDAD_DONE_DATA_SYNC', index=9, number=10,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='UPDATER_NEW_UPDATES', index=10, number=11,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='UPDATER_DONE_UPDATING', index=11, number=12,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='RAISE_WINDOW', index=12, number=13,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SMTP_SERVICE_STARTED', index=13, number=14,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SMTP_SERVICE_FAILED_TO_START', index=14, number=15,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SMTP_RECIPIENT_ACCEPTED_ENCRYPTED', index=15, number=16,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED', index=16, number=17,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SMTP_RECIPIENT_REJECTED', index=17, number=18,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SMTP_START_ENCRYPT_AND_SIGN', index=18, number=19,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SMTP_END_ENCRYPT_AND_SIGN', index=19, number=20,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SMTP_START_SIGN', index=20, number=21,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SMTP_END_SIGN', index=21, number=22,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SMTP_SEND_MESSAGE_START', index=22, number=23,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SMTP_SEND_MESSAGE_SUCCESS', index=23, number=24,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SMTP_SEND_MESSAGE_ERROR', index=24, number=25,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SMTP_CONNECTION_LOST', index=25, number=26,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='IMAP_SERVICE_STARTED', index=26, number=30,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='IMAP_SERVICE_FAILED_TO_START', index=27, number=31,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='IMAP_CLIENT_LOGIN', index=28, number=32,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='IMAP_FETCHED_INCOMING', index=29, number=33,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='IMAP_MSG_PROCESSING', index=30, number=34,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='IMAP_MSG_DECRYPTED', index=31, number=35,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='IMAP_MSG_SAVED_LOCALLY', index=32, number=36,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='IMAP_MSG_DELETED_INCOMING', index=33, number=37,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='IMAP_UNHANDLED_ERROR', index=34, number=38,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='IMAP_UNREAD_MAIL', index=35, number=39,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='KEYMANAGER_LOOKING_FOR_KEY', index=36, number=40,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='KEYMANAGER_KEY_FOUND', index=37, number=41,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='KEYMANAGER_KEY_NOT_FOUND', index=38, number=42,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='KEYMANAGER_STARTED_KEY_GENERATION', index=39, number=43,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='KEYMANAGER_FINISHED_KEY_GENERATION', index=40, number=44,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='KEYMANAGER_DONE_UPLOADING_KEYS', index=41, number=45,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SOLEDAD_INVALID_AUTH_TOKEN', index=42, number=46,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SOLEDAD_SYNC_SEND_STATUS', index=43, number=47,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SOLEDAD_SYNC_RECEIVE_STATUS', index=44, number=48,
- options=None,
- type=None),
- ],
- containing_type=None,
- options=None,
- serialized_start=557,
- serialized_end=1868,
-)
-
-
-CLIENT_SESSION_ID = 1
-CLIENT_UID = 2
-SOLEDAD_CREATING_KEYS = 3
-SOLEDAD_DONE_CREATING_KEYS = 4
-SOLEDAD_UPLOADING_KEYS = 5
-SOLEDAD_DONE_UPLOADING_KEYS = 6
-SOLEDAD_DOWNLOADING_KEYS = 7
-SOLEDAD_DONE_DOWNLOADING_KEYS = 8
-SOLEDAD_NEW_DATA_TO_SYNC = 9
-SOLEDAD_DONE_DATA_SYNC = 10
-UPDATER_NEW_UPDATES = 11
-UPDATER_DONE_UPDATING = 12
-RAISE_WINDOW = 13
-SMTP_SERVICE_STARTED = 14
-SMTP_SERVICE_FAILED_TO_START = 15
-SMTP_RECIPIENT_ACCEPTED_ENCRYPTED = 16
-SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED = 17
-SMTP_RECIPIENT_REJECTED = 18
-SMTP_START_ENCRYPT_AND_SIGN = 19
-SMTP_END_ENCRYPT_AND_SIGN = 20
-SMTP_START_SIGN = 21
-SMTP_END_SIGN = 22
-SMTP_SEND_MESSAGE_START = 23
-SMTP_SEND_MESSAGE_SUCCESS = 24
-SMTP_SEND_MESSAGE_ERROR = 25
-SMTP_CONNECTION_LOST = 26
-IMAP_SERVICE_STARTED = 30
-IMAP_SERVICE_FAILED_TO_START = 31
-IMAP_CLIENT_LOGIN = 32
-IMAP_FETCHED_INCOMING = 33
-IMAP_MSG_PROCESSING = 34
-IMAP_MSG_DECRYPTED = 35
-IMAP_MSG_SAVED_LOCALLY = 36
-IMAP_MSG_DELETED_INCOMING = 37
-IMAP_UNHANDLED_ERROR = 38
-IMAP_UNREAD_MAIL = 39
-KEYMANAGER_LOOKING_FOR_KEY = 40
-KEYMANAGER_KEY_FOUND = 41
-KEYMANAGER_KEY_NOT_FOUND = 42
-KEYMANAGER_STARTED_KEY_GENERATION = 43
-KEYMANAGER_FINISHED_KEY_GENERATION = 44
-KEYMANAGER_DONE_UPLOADING_KEYS = 45
-SOLEDAD_INVALID_AUTH_TOKEN = 46
-SOLEDAD_SYNC_SEND_STATUS = 47
-SOLEDAD_SYNC_RECEIVE_STATUS = 48
-
-
-_EVENTRESPONSE_STATUS = descriptor.EnumDescriptor(
- name='Status',
- full_name='leap.common.events.EventResponse.Status',
- filename=None,
- file=DESCRIPTOR,
- values=[
- descriptor.EnumValueDescriptor(
- name='OK', index=0, number=1,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='UNAUTH', index=1, number=2,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='ERROR', index=2, number=3,
- options=None,
- type=None),
- ],
- containing_type=None,
- options=None,
- serialized_start=515,
- serialized_end=554,
-)
-
-
-_SIGNALREQUEST = descriptor.Descriptor(
- name='SignalRequest',
- full_name='leap.common.events.SignalRequest',
- filename=None,
- file=DESCRIPTOR,
- containing_type=None,
- fields=[
- descriptor.FieldDescriptor(
- name='event', full_name='leap.common.events.SignalRequest.event', index=0,
- number=1, type=14, cpp_type=8, label=2,
- has_default_value=False, default_value=1,
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- descriptor.FieldDescriptor(
- name='content', full_name='leap.common.events.SignalRequest.content', index=1,
- number=2, type=9, cpp_type=9, label=2,
- has_default_value=False, default_value=unicode("", "utf-8"),
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- descriptor.FieldDescriptor(
- name='mac_method', full_name='leap.common.events.SignalRequest.mac_method', index=2,
- number=3, type=9, cpp_type=9, label=2,
- has_default_value=False, default_value=unicode("", "utf-8"),
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- descriptor.FieldDescriptor(
- name='mac', full_name='leap.common.events.SignalRequest.mac', index=3,
- number=4, type=12, cpp_type=9, label=2,
- has_default_value=False, default_value="",
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- descriptor.FieldDescriptor(
- name='enc_method', full_name='leap.common.events.SignalRequest.enc_method', index=4,
- number=5, type=9, cpp_type=9, label=1,
- has_default_value=False, default_value=unicode("", "utf-8"),
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- descriptor.FieldDescriptor(
- name='error_occurred', full_name='leap.common.events.SignalRequest.error_occurred', index=5,
- number=6, type=8, cpp_type=7, label=1,
- has_default_value=False, default_value=False,
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- ],
- extensions=[
- ],
- nested_types=[],
- enum_types=[
- ],
- options=None,
- is_extendable=False,
- extension_ranges=[],
- serialized_start=37,
- serialized_end=188,
-)
-
-
-_REGISTERREQUEST = descriptor.Descriptor(
- name='RegisterRequest',
- full_name='leap.common.events.RegisterRequest',
- filename=None,
- file=DESCRIPTOR,
- containing_type=None,
- fields=[
- descriptor.FieldDescriptor(
- name='event', full_name='leap.common.events.RegisterRequest.event', index=0,
- number=1, type=14, cpp_type=8, label=2,
- has_default_value=False, default_value=1,
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- descriptor.FieldDescriptor(
- name='port', full_name='leap.common.events.RegisterRequest.port', index=1,
- number=2, type=5, cpp_type=1, label=2,
- has_default_value=False, default_value=0,
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- descriptor.FieldDescriptor(
- name='mac_method', full_name='leap.common.events.RegisterRequest.mac_method', index=2,
- number=3, type=9, cpp_type=9, label=2,
- has_default_value=False, default_value=unicode("", "utf-8"),
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- descriptor.FieldDescriptor(
- name='mac', full_name='leap.common.events.RegisterRequest.mac', index=3,
- number=4, type=12, cpp_type=9, label=2,
- has_default_value=False, default_value="",
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- ],
- extensions=[
- ],
- nested_types=[],
- enum_types=[
- ],
- options=None,
- is_extendable=False,
- extension_ranges=[],
- serialized_start=190,
- serialized_end=296,
-)
-
-
-_UNREGISTERREQUEST = descriptor.Descriptor(
- name='UnregisterRequest',
- full_name='leap.common.events.UnregisterRequest',
- filename=None,
- file=DESCRIPTOR,
- containing_type=None,
- fields=[
- descriptor.FieldDescriptor(
- name='event', full_name='leap.common.events.UnregisterRequest.event', index=0,
- number=1, type=14, cpp_type=8, label=2,
- has_default_value=False, default_value=1,
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- descriptor.FieldDescriptor(
- name='port', full_name='leap.common.events.UnregisterRequest.port', index=1,
- number=2, type=5, cpp_type=1, label=2,
- has_default_value=False, default_value=0,
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- descriptor.FieldDescriptor(
- name='mac_method', full_name='leap.common.events.UnregisterRequest.mac_method', index=2,
- number=3, type=9, cpp_type=9, label=2,
- has_default_value=False, default_value=unicode("", "utf-8"),
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- descriptor.FieldDescriptor(
- name='mac', full_name='leap.common.events.UnregisterRequest.mac', index=3,
- number=4, type=12, cpp_type=9, label=2,
- has_default_value=False, default_value="",
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- ],
- extensions=[
- ],
- nested_types=[],
- enum_types=[
- ],
- options=None,
- is_extendable=False,
- extension_ranges=[],
- serialized_start=298,
- serialized_end=406,
-)
-
-
-_PINGREQUEST = descriptor.Descriptor(
- name='PingRequest',
- full_name='leap.common.events.PingRequest',
- filename=None,
- file=DESCRIPTOR,
- containing_type=None,
- fields=[
- ],
- extensions=[
- ],
- nested_types=[],
- enum_types=[
- ],
- options=None,
- is_extendable=False,
- extension_ranges=[],
- serialized_start=408,
- serialized_end=421,
-)
-
-
-_EVENTRESPONSE = descriptor.Descriptor(
- name='EventResponse',
- full_name='leap.common.events.EventResponse',
- filename=None,
- file=DESCRIPTOR,
- containing_type=None,
- fields=[
- descriptor.FieldDescriptor(
- name='status', full_name='leap.common.events.EventResponse.status', index=0,
- number=1, type=14, cpp_type=8, label=2,
- has_default_value=False, default_value=1,
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- descriptor.FieldDescriptor(
- name='result', full_name='leap.common.events.EventResponse.result', index=1,
- number=2, type=9, cpp_type=9, label=1,
- has_default_value=False, default_value=unicode("", "utf-8"),
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- ],
- extensions=[
- ],
- nested_types=[],
- enum_types=[
- _EVENTRESPONSE_STATUS,
- ],
- options=None,
- is_extendable=False,
- extension_ranges=[],
- serialized_start=424,
- serialized_end=554,
-)
-
-_SIGNALREQUEST.fields_by_name['event'].enum_type = _EVENT
-_REGISTERREQUEST.fields_by_name['event'].enum_type = _EVENT
-_UNREGISTERREQUEST.fields_by_name['event'].enum_type = _EVENT
-_EVENTRESPONSE.fields_by_name['status'].enum_type = _EVENTRESPONSE_STATUS
-_EVENTRESPONSE_STATUS.containing_type = _EVENTRESPONSE;
-DESCRIPTOR.message_types_by_name['SignalRequest'] = _SIGNALREQUEST
-DESCRIPTOR.message_types_by_name['RegisterRequest'] = _REGISTERREQUEST
-DESCRIPTOR.message_types_by_name['UnregisterRequest'] = _UNREGISTERREQUEST
-DESCRIPTOR.message_types_by_name['PingRequest'] = _PINGREQUEST
-DESCRIPTOR.message_types_by_name['EventResponse'] = _EVENTRESPONSE
-
-class SignalRequest(message.Message):
- __metaclass__ = reflection.GeneratedProtocolMessageType
- DESCRIPTOR = _SIGNALREQUEST
-
- # @@protoc_insertion_point(class_scope:leap.common.events.SignalRequest)
-
-class RegisterRequest(message.Message):
- __metaclass__ = reflection.GeneratedProtocolMessageType
- DESCRIPTOR = _REGISTERREQUEST
-
- # @@protoc_insertion_point(class_scope:leap.common.events.RegisterRequest)
-
-class UnregisterRequest(message.Message):
- __metaclass__ = reflection.GeneratedProtocolMessageType
- DESCRIPTOR = _UNREGISTERREQUEST
-
- # @@protoc_insertion_point(class_scope:leap.common.events.UnregisterRequest)
-
-class PingRequest(message.Message):
- __metaclass__ = reflection.GeneratedProtocolMessageType
- DESCRIPTOR = _PINGREQUEST
-
- # @@protoc_insertion_point(class_scope:leap.common.events.PingRequest)
-
-class EventResponse(message.Message):
- __metaclass__ = reflection.GeneratedProtocolMessageType
- DESCRIPTOR = _EVENTRESPONSE
-
- # @@protoc_insertion_point(class_scope:leap.common.events.EventResponse)
-
-
-_EVENTSSERVERSERVICE = descriptor.ServiceDescriptor(
- name='EventsServerService',
- full_name='leap.common.events.EventsServerService',
- file=DESCRIPTOR,
- index=0,
- options=None,
- serialized_start=1871,
- serialized_end=2220,
- methods=[
- descriptor.MethodDescriptor(
- name='ping',
- full_name='leap.common.events.EventsServerService.ping',
- index=0,
- containing_service=None,
- input_type=_PINGREQUEST,
- output_type=_EVENTRESPONSE,
- options=None,
- ),
- descriptor.MethodDescriptor(
- name='register',
- full_name='leap.common.events.EventsServerService.register',
- index=1,
- containing_service=None,
- input_type=_REGISTERREQUEST,
- output_type=_EVENTRESPONSE,
- options=None,
- ),
- descriptor.MethodDescriptor(
- name='unregister',
- full_name='leap.common.events.EventsServerService.unregister',
- index=2,
- containing_service=None,
- input_type=_UNREGISTERREQUEST,
- output_type=_EVENTRESPONSE,
- options=None,
- ),
- descriptor.MethodDescriptor(
- name='signal',
- full_name='leap.common.events.EventsServerService.signal',
- index=3,
- containing_service=None,
- input_type=_SIGNALREQUEST,
- output_type=_EVENTRESPONSE,
- options=None,
- ),
-])
-
-class EventsServerService(service.Service):
- __metaclass__ = service_reflection.GeneratedServiceType
- DESCRIPTOR = _EVENTSSERVERSERVICE
-class EventsServerService_Stub(EventsServerService):
- __metaclass__ = service_reflection.GeneratedServiceStubType
- DESCRIPTOR = _EVENTSSERVERSERVICE
-
-
-_EVENTSCLIENTSERVICE = descriptor.ServiceDescriptor(
- name='EventsClientService',
- full_name='leap.common.events.EventsClientService',
- file=DESCRIPTOR,
- index=1,
- options=None,
- serialized_start=2223,
- serialized_end=2400,
- methods=[
- descriptor.MethodDescriptor(
- name='ping',
- full_name='leap.common.events.EventsClientService.ping',
- index=0,
- containing_service=None,
- input_type=_PINGREQUEST,
- output_type=_EVENTRESPONSE,
- options=None,
- ),
- descriptor.MethodDescriptor(
- name='signal',
- full_name='leap.common.events.EventsClientService.signal',
- index=1,
- containing_service=None,
- input_type=_SIGNALREQUEST,
- output_type=_EVENTRESPONSE,
- options=None,
- ),
-])
-
-class EventsClientService(service.Service):
- __metaclass__ = service_reflection.GeneratedServiceType
- DESCRIPTOR = _EVENTSCLIENTSERVICE
-class EventsClientService_Stub(EventsClientService):
- __metaclass__ = service_reflection.GeneratedServiceStubType
- DESCRIPTOR = _EVENTSCLIENTSERVICE
-
-# @@protoc_insertion_point(module_scope)
diff --git a/src/leap/common/events/server.py b/src/leap/common/events/server.py
index 41aede3..a69202e 100644
--- a/src/leap/common/events/server.py
+++ b/src/leap/common/events/server.py
@@ -14,223 +14,79 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-A server for the events mechanism.
-
-A server can receive different kinds of requests from clients:
- 1. Registration request: store client port number to be notified when
- a specific signal arrives.
- 2. Signal request: redistribute the signal to registered clients.
"""
-import logging
-import socket
+The server for the events mechanism.
+"""
-from protobuf.socketrpc import RpcService
-from leap.common.events import (
- events_pb2 as proto,
- daemon,
-)
+import logging
+import txzmq
+from leap.common.zmq_utils import zmq_has_curve
-logger = logging.getLogger(__name__)
+from leap.common.events.zmq_components import TxZmqServerComponent
-SERVER_PORT = 8090
+if zmq_has_curve():
+ EMIT_ADDR = "tcp://127.0.0.1:9000"
+ REG_ADDR = "tcp://127.0.0.1:9001"
+else:
+ EMIT_ADDR = "ipc:///tmp/leap.common.events.socket.0"
+ REG_ADDR = "ipc:///tmp/leap.common.events.socket.1"
-# the `registered_clients` dictionary below should have the following
-# format:
-#
-# { event_signal: [ port, ... ], ... }
-#
-registered_clients = {}
-
-class PortAlreadyTaken(Exception):
- """
- Raised when trying to open a server in a port that is already taken.
- """
- pass
+logger = logging.getLogger(__name__)
-def ensure_server(port=SERVER_PORT):
+def ensure_server(emit_addr=EMIT_ADDR, reg_addr=REG_ADDR):
"""
- Make sure the server is running on the given port.
+ Make sure the server is running in the given addresses.
- Attempt to connect to given local port. Upon success, assume that the
- events server has already been started. Upon failure, start events server.
+ :param emit_addr: The address in which to receive events from clients.
+ :type emit_addr: str
+ :param reg_addr: The address to which publish events to clients.
+ :type reg_addr: str
- :param port: the port in which server should be listening
- :type port: int
-
- :return: the daemon instance or nothing
- :rtype: EventsServerDaemon or None
-
- :raise PortAlreadyTaken: Raised if C{port} is already taken by something
- that is not an events server.
- """
- try:
- # check if port is available
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.connect(('localhost', port))
- s.close()
- # port is taken, check if there's a server running there
- response = ping(port=port, timeout=1000)
- if response is not None and response.status == proto.EventResponse.OK:
- logger.info('A server is already running on port %d.', port)
- return
- # port is taken, and not by an events server
- logger.warning(
- 'Port %d is taken by something not an events server.', port)
- raise PortAlreadyTaken(port)
- except socket.error:
- # port is available, run a server
- logger.info('Launching server on port %d.', port)
- return EventsServerDaemon.ensure(port)
-
-
-def ping(port=SERVER_PORT, reqcbk=None, timeout=1000):
+ :return: an events server instance
+ :rtype: EventsServer
"""
- Ping the server.
-
- :param port: the port in which server should be listening
- :type port: int
- :param reqcbk: a callback to be called when a response from server is
- received
- :type reqcbk: function(proto.PingRequest, proto.EventResponse)
- :param timeout: the timeout for synch calls
- :type timeout: int
-
- :return: the response from server for synch calls or nothing for asynch
- calls.
- :rtype: leap.common.events.events_pb2.EventsResponse or None
- """
- request = proto.PingRequest()
- service = RpcService(
- proto.EventsServerService_Stub,
- port,
- 'localhost')
- logger.debug("Pinging server in port %d..." % port)
- return service.ping(request, callback=reqcbk, timeout=timeout)
+ _server = EventsServer(emit_addr, reg_addr)
+ return _server
-class EventsServerService(proto.EventsServerService):
+class EventsServer(TxZmqServerComponent):
"""
- Service for receiving events in clients.
+ An events server that listens for events in one address and publishes those
+ events in another address.
"""
- def register(self, controller, request, done):
- """
- Register a client port to be signaled when specific events come in.
-
- :param controller: used to mediate a single method call
- :type controller: protobuf.socketrpc.controller.SocketRpcController
- :param request: the request received from the client
- :type request: leap.common.events.events_pb2.RegisterRequest
- :param done: callback to be called when done
- :type done: protobuf.socketrpc.server.Callback
- """
- logger.info("Received registration request: %s..." % str(request)[:40])
- # add client port to signal list
- if request.event not in registered_clients:
- registered_clients[request.event] = set([])
- registered_clients[request.event].add(request.port)
- # send response back to client
-
- logger.debug('sending response back')
- response = proto.EventResponse()
- response.status = proto.EventResponse.OK
- done.run(response)
-
- def unregister(self, controller, request, done):
- """
- Unregister a client port so it will not be signaled when specific
- events come in.
-
- :param controller: used to mediate a single method call
- :type controller: protobuf.socketrpc.controller.SocketRpcController
- :param request: the request received from the client
- :type request: leap.common.events.events_pb2.RegisterRequest
- :param done: callback to be called when done
- :type done: protobuf.socketrpc.server.Callback
- """
- logger.info(
- "Received unregistration request: %s..." % str(request)[:40])
- # remove client port from signal list
- response = proto.EventResponse()
- if request.event in registered_clients:
- try:
- registered_clients[request.event].remove(request.port)
- response.status = proto.EventResponse.OK
- except KeyError:
- response.status = proto.EventsResponse.ERROR
- response.result = 'Port %d not registered.' % request.port
- # send response back to client
- logger.debug('sending response back')
- done.run(response)
-
- def signal(self, controller, request, done):
- """
- Perform an RPC call to signal all clients registered to receive a
- specific signal.
-
- :param controller: used to mediate a single method call
- :type controller: protobuf.socketrpc.controller.SocketRpcController
- :param request: the request received from the client
- :type request: leap.common.events.events_pb2.SignalRequest
- :param done: callback to be called when done
- :type done: protobuf.socketrpc.server.Callback
- """
- logger.debug('Received signal from client: %s...', str(request)[:40])
- # send signal to all registered clients
- # TODO: verify signal auth
- if request.event in registered_clients:
- for port in registered_clients[request.event]:
-
- def callback(req, resp):
- logger.debug("Signal received by " + str(port))
-
- service = RpcService(proto.EventsClientService_Stub,
- port, 'localhost')
- service.signal(request, callback=callback)
- # send response back to client
- response = proto.EventResponse()
- response.status = proto.EventResponse.OK
- done.run(response)
-
- def ping(self, controller, request, done):
+ def __init__(self, emit_addr, reg_addr):
"""
- Reply to a ping request.
-
- :param controller: used to mediate a single method call
- :type controller: protobuf.socketrpc.controller.SocketRpcController
- :param request: the request received from the client
- :type request: leap.common.events.events_pb2.RegisterRequest
- :param done: callback to be called when done
- :type done: protobuf.socketrpc.server.Callback
- """
- logger.debug("Received ping request, sending response.")
- response = proto.EventResponse()
- response.status = proto.EventResponse.OK
- done.run(response)
-
+ Initialize the events server.
-class EventsServerDaemon(daemon.EventsSingletonDaemon):
- """
- Singleton class for starting an events server daemon.
- """
-
- @classmethod
- def ensure(cls, port):
+ :param emit_addr: The address in which to receive events from clients.
+ :type emit_addr: str
+ :param reg_addr: The address to which publish events to clients.
+ :type reg_addr: str
"""
- Make sure the daemon is running on the given port.
-
- :param port: the port in which the daemon should listen
- :type port: int
+ TxZmqServerComponent.__init__(self)
+ # bind PULL and PUB sockets
+ self._pull, self.pull_port = self._zmq_bind(
+ txzmq.ZmqPullConnection, emit_addr)
+ self._pub, self.pub_port = self._zmq_bind(
+ txzmq.ZmqPubConnection, reg_addr)
+ # set a handler for arriving messages
+ self._pull.onPull = self._onPull
+
+ def _onPull(self, message):
+ """
+ Callback executed when a message is pulled from a client.
- :return: a daemon instance
- :rtype: EventsServerDaemon
+ :param message: The message sent by the client.
+ :type message: str
"""
- return cls.ensure_service(port, EventsServerService())
+ event, content = message[0].split(b"\0", 1)
+ logger.debug("Publishing event: %s" % event)
+ self._pub.publish(content, tag=event)
diff --git a/src/leap/common/events/txclient.py b/src/leap/common/events/txclient.py
new file mode 100644
index 0000000..8206ed5
--- /dev/null
+++ b/src/leap/common/events/txclient.py
@@ -0,0 +1,185 @@
+# -*- coding: utf-8 -*-
+# txclient.py
+# Copyright (C) 2013, 2014, 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+The client end point of the events mechanism, implemented using txzmq.
+
+Clients are the communicating parties of the events mechanism. They
+communicate by sending messages to a server, which in turn redistributes
+messages to other clients.
+
+When a client registers a callback for a given event, it also tells the
+server that it wants to be notified whenever events of that type are sent by
+some other client.
+"""
+
+
+import logging
+import pickle
+
+import txzmq
+
+from leap.common.events.zmq_components import TxZmqClientComponent
+from leap.common.events.client import EventsClient
+from leap.common.events.client import configure_client
+from leap.common.events.server import EMIT_ADDR
+from leap.common.events.server import REG_ADDR
+from leap.common.events import catalog
+
+
+logger = logging.getLogger(__name__)
+
+
+__all__ = [
+ "configure_client",
+ "EventsTxClient",
+ "register",
+ "unregister",
+ "emit",
+ "shutdown",
+]
+
+
+class EventsTxClient(TxZmqClientComponent, EventsClient):
+ """
+ A twisted events client that listens for events in one address and
+ publishes those events to another address.
+ """
+
+ def __init__(self, emit_addr=EMIT_ADDR, reg_addr=REG_ADDR,
+ path_prefix=None):
+ """
+ Initialize the events server.
+ """
+ TxZmqClientComponent.__init__(self, path_prefix=path_prefix)
+ EventsClient.__init__(self, emit_addr, reg_addr)
+ # connect SUB first, otherwise we might miss some event sent from this
+ # same client
+ self._sub = self._zmq_connect(txzmq.ZmqSubConnection, reg_addr)
+ self._sub.gotMessage = self._gotMessage
+ self._push = self._zmq_connect(txzmq.ZmqPushConnection, emit_addr)
+
+ def _gotMessage(self, msg, tag):
+ """
+ Handle an incoming event.
+
+ :param msg: The incoming message.
+ :type msg: list(str)
+ """
+ event = getattr(catalog, tag)
+ content = pickle.loads(msg)
+ self._handle_event(event, content)
+
+ def _subscribe(self, tag):
+ """
+ Subscribe to a tag on the zmq SUB socket.
+
+ :param tag: The tag to be subscribed.
+ :type tag: str
+ """
+ self._sub.subscribe(tag)
+
+ def _unsubscribe(self, tag):
+ """
+ Unsubscribe from a tag on the zmq SUB socket.
+
+ :param tag: The tag to be unsubscribed.
+ :type tag: str
+ """
+ self._sub.unsubscribe(tag)
+
+ def _send(self, data):
+ """
+ Send data through PUSH socket.
+
+ :param data: The data to be sent.
+ :type event: str
+ """
+ self._push.send(data)
+
+ def shutdown(self):
+ TxZmqClientComponent.shutdown(self)
+ EventsClient.shutdown(self)
+
+
+def register(event, callback, uid=None, replace=False):
+ """
+ Register a callback to be executed when an event is received.
+
+ :param event: The event that triggers the callback.
+ :type event: str
+ :param callback: The callback to be executed.
+ :type callback: callable(event, content)
+ :param uid: The callback uid.
+ :type uid: str
+ :param replace: Wether an eventual callback with same ID should be
+ replaced.
+ :type replace: bool
+
+ :return: The callback uid.
+ :rtype: str
+
+ :raises CallbackAlreadyRegisteredError: when there's already a callback
+ identified by the given uid and replace is False.
+ """
+ return EventsTxClient.instance().register(
+ event, callback, uid=uid, replace=replace)
+
+
+def unregister(event, uid=None):
+ """
+ Unregister callbacks for an event.
+
+ If uid is not None, then only the callback identified by the given uid is
+ removed. Otherwise, all callbacks for the event are removed.
+
+ :param event: The event that triggers the callback.
+ :type event: str
+ :param uid: The callback uid.
+ :type uid: str
+ """
+ return EventsTxClient.instance().unregister(event, uid=uid)
+
+
+def emit(event, *content):
+ """
+ Send an event.
+
+ :param event: The event to be sent.
+ :type event: str
+ :param content: The content of the event.
+ :type content: list
+ """
+ return EventsTxClient.instance().emit(event, *content)
+
+
+def shutdown():
+ """
+ Shutdown the events client.
+ """
+ EventsTxClient.instance().shutdown()
+
+
+def instance():
+ """
+ Return an instance of the events client.
+
+ :return: An instance of the events client.
+ :rtype: EventsClientThread
+ """
+ return EventsTxClient.instance()
diff --git a/src/leap/common/events/zmq_components.py b/src/leap/common/events/zmq_components.py
new file mode 100644
index 0000000..4fb95d3
--- /dev/null
+++ b/src/leap/common/events/zmq_components.py
@@ -0,0 +1,179 @@
+# -*- coding: utf-8 -*-
+# zmq.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+The server for the events mechanism.
+"""
+
+
+import os
+import logging
+import txzmq
+import re
+
+from abc import ABCMeta
+
+# XXX some distros don't package libsodium, so we have to be prepared for
+# absence of zmq.auth
+try:
+ import zmq.auth
+ from zmq.auth.thread import ThreadAuthenticator
+except ImportError:
+ pass
+
+from leap.common.config import get_path_prefix
+from leap.common.zmq_utils import zmq_has_curve
+from leap.common.zmq_utils import maybe_create_and_get_certificates
+from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX
+
+
+logger = logging.getLogger(__name__)
+
+
+ADDRESS_RE = re.compile("(.+)://(.+):([0-9]+)")
+
+
+class TxZmqComponent(object):
+ """
+ A twisted-powered zmq events component.
+ """
+
+ __metaclass__ = ABCMeta
+
+ _component_type = None
+
+ def __init__(self, path_prefix=None):
+ """
+ Initialize the txzmq component.
+ """
+ self._factory = txzmq.ZmqFactory()
+ self._factory.registerForShutdown()
+ if path_prefix == None:
+ path_prefix = get_path_prefix()
+ self._config_prefix = os.path.join(path_prefix, "leap", "events")
+ self._connections = []
+
+ @property
+ def component_type(self):
+ if not self._component_type:
+ raise Exception(
+ "Make sure implementations of TxZmqComponent"
+ "define a self._component_type!")
+ return self._component_type
+
+ def _zmq_connect(self, connClass, address):
+ """
+ Connect to an address.
+
+ :param connClass: The connection class to be used.
+ :type connClass: txzmq.ZmqConnection
+ :param address: The address to connect to.
+ :type address: str
+
+ :return: The binded connection.
+ :rtype: txzmq.ZmqConnection
+ """
+ connection = connClass(self._factory)
+ # create and configure socket
+ socket = connection.socket
+ if zmq_has_curve():
+ public, secret = maybe_create_and_get_certificates(
+ self._config_prefix, self.component_type)
+ server_public_file = os.path.join(
+ self._config_prefix, PUBLIC_KEYS_PREFIX, "server.key")
+ server_public, _ = zmq.auth.load_certificate(server_public_file)
+ socket.curve_publickey = public
+ socket.curve_secretkey = secret
+ socket.curve_serverkey = server_public
+ socket.connect(address)
+ logger.debug("Connected %s to %s." % (connClass, address))
+ self._connections.append(connection)
+ return connection
+
+ def _zmq_bind(self, connClass, address):
+ """
+ Bind to an address.
+
+ :param connClass: The connection class to be used.
+ :type connClass: txzmq.ZmqConnection
+ :param address: The address to bind to.
+ :type address: str
+
+ :return: The binded connection and port.
+ :rtype: (txzmq.ZmqConnection, int)
+ """
+ connection = connClass(self._factory)
+ socket = connection.socket
+ if zmq_has_curve():
+ public, secret = maybe_create_and_get_certificates(
+ self._config_prefix, self.component_type)
+ socket.curve_publickey = public
+ socket.curve_secretkey = secret
+ self._start_thread_auth(connection.socket)
+ # check if port was given
+ protocol, addr, port = ADDRESS_RE.match(address).groups()
+ if port == "0":
+ port = socket.bind_to_random_port("%s://%s" % (protocol, addr))
+ else:
+ socket.bind(address)
+ port = int(port)
+ logger.debug("Binded %s to %s://%s:%d."
+ % (connClass, protocol, addr, port))
+ self._connections.append(connection)
+ return connection, port
+
+ def _start_thread_auth(self, socket):
+ """
+ Start the zmq curve thread authenticator.
+
+ :param socket: The socket in which to configure the authenticator.
+ :type socket: zmq.Socket
+ """
+ authenticator = ThreadAuthenticator(self._factory.context)
+ authenticator.start()
+ # XXX do not hardcode this here.
+ authenticator.allow('127.0.0.1')
+ # tell authenticator to use the certificate in a directory
+ public_keys_dir = os.path.join(self._config_prefix, PUBLIC_KEYS_PREFIX)
+ authenticator.configure_curve(domain="*", location=public_keys_dir)
+ socket.curve_server = True # must come before bind
+
+ def shutdown(self):
+ """
+ Shutdown the component.
+ """
+ logger.debug("Shutting down component %s." % str(self))
+ for conn in self._connections:
+ conn.shutdown()
+ self._factory.shutdown()
+
+
+class TxZmqServerComponent(TxZmqComponent):
+ """
+ A txZMQ server component.
+ """
+
+ _component_type = "server"
+
+
+class TxZmqClientComponent(TxZmqComponent):
+ """
+ A txZMQ client component.
+ """
+
+ _component_type = "client"
diff --git a/src/leap/common/tests/test_events.py b/src/leap/common/tests/test_events.py
index 0779b2e..7ef3e1b 100644
--- a/src/leap/common/tests/test_events.py
+++ b/src/leap/common/tests/test_events.py
@@ -15,414 +15,156 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import unittest
-import sets
-import time
-import socket
-import threading
-import random
-
-
-from mock import Mock
-from protobuf.socketrpc import RpcService
-from leap.common import events
-from leap.common.events import (
- server,
- client,
- mac_auth,
-)
-from leap.common.events.events_pb2 import (
- EventsServerService,
- EventsServerService_Stub,
- EventsClientService_Stub,
- EventResponse,
- SignalRequest,
- RegisterRequest,
- PingRequest,
- SOLEDAD_CREATING_KEYS,
- CLIENT_UID,
-)
+import os
+import logging
+import time
-port = 8090
+from twisted.trial import unittest
+from twisted.internet import defer
-received = False
+from leap.common.events import server
+from leap.common.events import client
+from leap.common.events import txclient
+from leap.common.events import catalog
+from leap.common.events.errors import CallbackAlreadyRegisteredError
-class EventsTestCase(unittest.TestCase):
+if 'DEBUG' in os.environ:
+ logging.basicConfig(level=logging.DEBUG)
- @classmethod
- def setUpClass(cls):
- server.EventsServerDaemon.ensure(8090)
- cls.callbacks = events.client.registered_callbacks
- @classmethod
- def tearDownClass(cls):
- # give some time for requests to be processed.
- time.sleep(1)
+class EventsGenericClientTestCase(object):
def setUp(self):
- super(EventsTestCase, self).setUp()
+ self._server = server.ensure_server(
+ emit_addr="tcp://127.0.0.1:0",
+ reg_addr="tcp://127.0.0.1:0")
+ self._client.configure_client(
+ emit_addr="tcp://127.0.0.1:%d" % self._server.pull_port,
+ reg_addr="tcp://127.0.0.1:%d" % self._server.pub_port)
def tearDown(self):
- #events.client.registered_callbacks = {}
- server.registered_clients = {}
- super(EventsTestCase, self).tearDown()
-
- def test_service_singleton(self):
- """
- Ensure that there's always just one instance of the server daemon
- running.
- """
- service1 = server.EventsServerDaemon.ensure(8090)
- service2 = server.EventsServerDaemon.ensure(8090)
- self.assertEqual(service1, service2,
- "Can't get singleton class for service.")
+ self._client.shutdown()
+ self._server.shutdown()
+ # wait a bit for sockets to close properly
+ time.sleep(0.1)
def test_client_register(self):
"""
Ensure clients can register callbacks.
"""
- self.assertTrue(1 not in self.callbacks,
- 'There should should be no callback for this signal.')
- events.register(1, lambda x: True)
- self.assertTrue(1 in self.callbacks,
- 'Could not register signal in local client.')
- events.register(2, lambda x: True)
- self.assertTrue(1 in self.callbacks,
- 'Could not register signal in local client.')
- self.assertTrue(2 in self.callbacks,
- 'Could not register signal in local client.')
+ callbacks = self._client.instance().callbacks
+ self.assertTrue(len(callbacks) == 0,
+ 'There should be no callback for this event.')
+ # register one event
+ event1 = catalog.CLIENT_UID
+ cbk1 = lambda event, _: True
+ uid1 = self._client.register(event1, cbk1)
+ # assert for correct registration
+ self.assertTrue(len(callbacks) == 1)
+ self.assertTrue(callbacks[event1][uid1] == cbk1,
+ 'Could not register event in local client.')
+ # register another event
+ event2 = catalog.CLIENT_SESSION_ID
+ cbk2 = lambda event, _: True
+ uid2 = self._client.register(event2, cbk2)
+ # assert for correct registration
+ self.assertTrue(len(callbacks) == 2)
+ self.assertTrue(callbacks[event2][uid2] == cbk2,
+ 'Could not register event in local client.')
def test_register_signal_replace(self):
"""
Make sure clients can replace already registered callbacks.
"""
- sig = 3
- cbk = lambda x: True
- events.register(sig, cbk, uid=1)
- self.assertRaises(Exception, events.register, sig, lambda x: True,
- uid=1)
- events.register(sig, lambda x: True, uid=1, replace=True)
- self.assertTrue(sig in self.callbacks, 'Could not register signal.')
- self.assertEqual(1, len(self.callbacks[sig]),
- 'Wrong number of registered callbacks.')
-
- def test_signal_response_status(self):
- """
- Ensure there's an appropriate response from server when signaling.
- """
- sig = 4
- request = SignalRequest()
- request.event = sig
- request.content = 'my signal contents'
- request.mac_method = mac_auth.MacMethod.MAC_NONE
- request.mac = ""
- service = RpcService(EventsServerService_Stub, port, 'localhost')
- # test synch
- response = service.signal(request, timeout=1000)
- self.assertEqual(EventResponse.OK, response.status,
- 'Wrong response status.')
+ event = catalog.CLIENT_UID
+ d = defer.Deferred()
+ cbk_fail = lambda event, _: d.errback(event)
+ cbk_succeed = lambda event, _: d.callback(event)
+ self._client.register(event, cbk_fail, uid=1)
+ self._client.register(event, cbk_succeed, uid=1, replace=True)
+ self._client.emit(event, None)
+ return d
- def test_signal_executes_callback(self):
- """
- Ensure callback is executed upon receiving signal.
+ def test_register_signal_replace_fails_when_replace_is_false(self):
"""
- sig = CLIENT_UID
- request = SignalRequest()
- request.event = sig
- request.content = 'my signal contents'
- request.mac_method = mac_auth.MacMethod.MAC_NONE
- request.mac = ""
- service = RpcService(EventsServerService_Stub, port, 'localhost')
-
- # register a callback
- flag = Mock()
- events.register(sig, lambda req: flag(req.event))
- # signal
- response = service.signal(request)
- self.assertEqual(EventResponse.OK, response.status,
- 'Wrong response status.')
- time.sleep(1) # wait for signal to arrive
- flag.assert_called_once_with(sig)
-
- def test_events_server_service_register(self):
+ Make sure clients trying to replace already registered callbacks fail
+ when replace=False
"""
- Ensure the server can register clients to be signaled.
- """
- sig = 5
- request = RegisterRequest()
- request.event = sig
- request.port = 8091
- request.mac_method = mac_auth.MacMethod.MAC_NONE
- request.mac = ""
- service = RpcService(EventsServerService_Stub, port, 'localhost')
- complist = server.registered_clients
- self.assertEqual({}, complist,
- 'There should be no registered_ports when '
- 'server has just been created.')
- response = service.register(request, timeout=1000)
- self.assertTrue(sig in complist, "Signal not registered succesfully.")
- self.assertTrue(8091 in complist[sig],
- 'Failed registering client port.')
+ event = catalog.CLIENT_UID
+ self._client.register(event, lambda event, _: None, uid=1)
+ self.assertRaises(
+ CallbackAlreadyRegisteredError,
+ self._client.register,
+ event, lambda event, _: None, uid=1, replace=False)
- def test_client_request_register(self):
+ def test_register_more_than_one_callback_works(self):
"""
- Ensure clients can register themselves with server.
+ Make sure clients can replace already registered callbacks.
"""
- sig = 6
- complist = server.registered_clients
- self.assertTrue(sig not in complist,
- 'There should be no registered clients for this '
- 'signal.')
- events.register(sig, lambda x: True)
- time.sleep(0.1)
- port = client.EventsClientDaemon.get_instance().get_port()
- self.assertTrue(sig in complist, 'Failed registering client.')
- self.assertTrue(port in complist[sig],
- 'Failed registering client port.')
+ event = catalog.CLIENT_UID
+ d1 = defer.Deferred()
+ cbk1 = lambda event, _: d1.callback(event)
+ d2 = defer.Deferred()
+ cbk2 = lambda event, _: d2.callback(event)
+ self._client.register(event, cbk1)
+ self._client.register(event, cbk2)
+ self._client.emit(event, None)
+ d = defer.gatherResults([d1, d2])
+ return d
def test_client_receives_signal(self):
"""
Ensure clients can receive signals.
"""
- sig = 7
- flag = Mock()
-
- events.register(sig, lambda req: flag(req.event))
- request = SignalRequest()
- request.event = sig
- request.content = ""
- request.mac_method = mac_auth.MacMethod.MAC_NONE
- request.mac = ""
- service = RpcService(EventsServerService_Stub, port, 'localhost')
- response = service.signal(request, timeout=1000)
- self.assertTrue(response is not None, 'Did not receive response.')
- time.sleep(0.5)
- flag.assert_called_once_with(sig)
-
- def test_client_send_signal(self):
- """
- Ensure clients can send signals.
- """
- sig = 8
- response = events.signal(sig)
- self.assertTrue(response.status == response.OK,
- 'Received wrong response status when signaling.')
+ event = catalog.CLIENT_UID
+ d = defer.Deferred()
+ def cbk(events, _):
+ d.callback(event)
+ self._client.register(event, cbk)
+ self._client.emit(event, None)
+ return d
def test_client_unregister_all(self):
"""
Test that the client can unregister all events for one signal.
"""
- sig = CLIENT_UID
- complist = server.registered_clients
- events.register(sig, lambda x: True)
- events.register(sig, lambda x: True)
- time.sleep(0.1)
- events.unregister(sig)
- time.sleep(0.1)
- port = client.EventsClientDaemon.get_instance().get_port()
- self.assertFalse(bool(complist[sig]))
- self.assertTrue(port not in complist[sig])
+ event1 = catalog.CLIENT_UID
+ d = defer.Deferred()
+ # register more than one callback for the same event
+ self._client.register(event1, lambda ev, _: d.errback(None))
+ self._client.register(event1, lambda ev, _: d.errback(None))
+ # unregister and emit the event
+ self._client.unregister(event1)
+ self._client.emit(event1, None)
+ # register and emit another event so the deferred can succeed
+ event2 = catalog.CLIENT_SESSION_ID
+ self._client.register(event2, lambda ev, _: d.callback(None))
+ self._client.emit(event2, None)
+ return d
def test_client_unregister_by_uid(self):
"""
Test that the client can unregister an event by uid.
"""
- sig = CLIENT_UID
- complist = server.registered_clients
- events.register(sig, lambda x: True, uid='cbkuid')
- events.register(sig, lambda x: True, uid='cbkuid2')
- time.sleep(0.1)
- events.unregister(sig, uid='cbkuid')
- time.sleep(0.1)
- port = client.EventsClientDaemon.get_instance().get_port()
- self.assertTrue(sig in complist)
- self.assertTrue(len(complist[sig]) == 1)
- self.assertTrue(
- client.registered_callbacks[sig].pop()[0] == 'cbkuid2')
- self.assertTrue(port in complist[sig])
-
- def test_server_replies_ping(self):
- """
- Ensure server replies to a ping.
- """
- request = PingRequest()
- service = RpcService(EventsServerService_Stub, port, 'localhost')
- response = service.ping(request, timeout=1000)
- self.assertIsNotNone(response)
- self.assertEqual(EventResponse.OK, response.status,
- 'Wrong response status.')
-
- def test_client_replies_ping(self):
- """
- Ensure clients reply to a ping.
- """
- daemon = client.ensure_client_daemon()
- port = daemon.get_port()
- request = PingRequest()
- service = RpcService(EventsClientService_Stub, port, 'localhost')
- response = service.ping(request, timeout=1000)
- self.assertEqual(EventResponse.OK, response.status,
- 'Wrong response status.')
-
- def test_server_ping(self):
- """
- Ensure the function from server module pings correctly.
- """
- response = server.ping()
- self.assertIsNotNone(response)
- self.assertEqual(EventResponse.OK, response.status,
- 'Wrong response status.')
-
- def test_client_ping(self):
- """
- Ensure the function from client module pings correctly.
- """
- daemon = client.ensure_client_daemon()
- response = client.ping(daemon.get_port())
- self.assertIsNotNone(response)
- self.assertEqual(EventResponse.OK, response.status,
- 'Wrong response status.')
-
- def test_module_ping_server(self):
- """
- Ensure the function from main module pings server correctly.
- """
- response = events.ping_server()
- self.assertIsNotNone(response)
- self.assertEqual(EventResponse.OK, response.status,
- 'Wrong response status.')
-
- def test_module_ping_client(self):
- """
- Ensure the function from main module pings clients correctly.
- """
- daemon = client.ensure_client_daemon()
- response = events.ping_client(daemon.get_port())
- self.assertIsNotNone(response)
- self.assertEqual(EventResponse.OK, response.status,
- 'Wrong response status.')
-
- def test_ensure_server_raises_if_port_taken(self):
- """
- Verify that server raises an exception if port is already taken.
- """
- # get a random free port
- while True:
- port = random.randint(1024, 65535)
- try:
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.connect(('localhost', port))
- s.close()
- except:
- break
-
- class PortBlocker(threading.Thread):
-
- def run(self):
- conns = 0
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.bind(('localhost', port))
- s.setblocking(1)
- s.listen(1)
- while conns < 2: # blocks until rece
- conns += 1
- s.accept()
- s.close()
-
- # block the port
- taker = PortBlocker()
- taker.start()
- time.sleep(1) # wait for thread to start.
- self.assertRaises(
- server.PortAlreadyTaken, server.ensure_server, port)
-
- def test_async_register(self):
- """
- Test asynchronous registering of callbacks.
- """
- flag = Mock()
+ event = catalog.CLIENT_UID
+ d = defer.Deferred()
+ # register one callback that would fail
+ uid = self._client.register(event, lambda ev, _: d.errback(None))
+ # register one callback that will succeed
+ self._client.register(event, lambda ev, _: d.callback(None))
+ # unregister by uid and emit the event
+ self._client.unregister(event, uid=uid)
+ self._client.emit(event, None)
+ return d
- # executed after async register, when response is received from server
- def reqcbk(request, response):
- flag(request.event, response.status)
- # callback registered by application
- def callback(request):
- pass
+class EventsTxClientTestCase(EventsGenericClientTestCase, unittest.TestCase):
- # passing a callback as reqcbk param makes the call asynchronous
- result = events.register(CLIENT_UID, callback, reqcbk=reqcbk)
- self.assertIsNone(result)
- events.signal(CLIENT_UID)
- time.sleep(1) # wait for signal to arrive from server
- flag.assert_called_once_with(CLIENT_UID, EventResponse.OK)
+ _client = txclient
- def test_async_signal(self):
- """
- Test asynchronous registering of callbacks.
- """
- flag = Mock()
-
- # executed after async signal, when response is received from server
- def reqcbk(request, response):
- flag(request.event, response.status)
-
- # passing a callback as reqcbk param makes the call asynchronous
- result = events.signal(CLIENT_UID, reqcbk=reqcbk)
- self.assertIsNone(result)
- time.sleep(1) # wait for signal to arrive from server
- flag.assert_called_once_with(CLIENT_UID, EventResponse.OK)
-
- def test_async_unregister(self):
- """
- Test asynchronous unregistering of callbacks.
- """
- flag = Mock()
-
- # executed after async signal, when response is received from server
- def reqcbk(request, response):
- flag(request.event, response.status)
-
- # callback registered by application
- def callback(request):
- pass
-
- # passing a callback as reqcbk param makes the call asynchronous
- events.register(CLIENT_UID, callback)
- result = events.unregister(CLIENT_UID, reqcbk=reqcbk)
- self.assertIsNone(result)
- time.sleep(1) # wait for signal to arrive from server
- flag.assert_called_once_with(CLIENT_UID, EventResponse.OK)
-
- def test_async_ping_server(self):
- """
- Test asynchronous pinging of server.
- """
- flag = Mock()
-
- # executed after async signal, when response is received from server
- def reqcbk(request, response):
- flag(response.status)
-
- result = events.ping_server(reqcbk=reqcbk)
- self.assertIsNone(result)
- time.sleep(1) # wait for response to arrive from server.
- flag.assert_called_once_with(EventResponse.OK)
-
- def test_async_ping_client(self):
- """
- Test asynchronous pinging of client.
- """
- flag = Mock()
- # executed after async signal, when response is received from server
- def reqcbk(request, response):
- flag(response.status)
+class EventsClientTestCase(EventsGenericClientTestCase, unittest.TestCase):
- daemon = client.ensure_client_daemon()
- result = events.ping_client(daemon.get_port(), reqcbk=reqcbk)
- self.assertIsNone(result)
- time.sleep(1) # wait for response to arrive from server.
- flag.assert_called_once_with(EventResponse.OK)
+ _client = client
diff --git a/src/leap/common/zmq_utils.py b/src/leap/common/zmq_utils.py
new file mode 100644
index 0000000..19625b9
--- /dev/null
+++ b/src/leap/common/zmq_utils.py
@@ -0,0 +1,105 @@
+# -*- coding: utf-8 -*-
+# zmq.py
+# Copyright (C) 2013, 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Utilities to handle ZMQ certificates.
+"""
+import os
+import logging
+import stat
+import shutil
+
+import zmq
+
+try:
+ import zmq.auth
+except ImportError:
+ pass
+
+from leap.common.files import mkdir_p
+from leap.common.check import leap_assert
+
+logger = logging.getLogger(__name__)
+
+
+KEYS_PREFIX = "zmq_certificates"
+PUBLIC_KEYS_PREFIX = os.path.join(KEYS_PREFIX, "public_keys")
+PRIVATE_KEYS_PREFIX = os.path.join(KEYS_PREFIX, "private_keys")
+
+
+def zmq_has_curve():
+ """
+ Return whether the current ZMQ has support for auth and CurveZMQ security.
+
+ :rtype: bool
+
+ Version notes:
+ `zmq.curve_keypair()` is new in version 14.0, new in version libzmq-4.0.
+ Requires libzmq (>= 4.0) to have been linked with libsodium.
+ `zmq.auth` module is new in version 14.1
+ `zmq.has()` is new in version 14.1, new in version libzmq-4.1.
+ """
+ zmq_version = zmq.zmq_version_info()
+ pyzmq_version = zmq.pyzmq_version_info()
+
+ if pyzmq_version >= (14, 1, 0) and zmq_version >= (4, 1):
+ return zmq.has('curve')
+
+ if pyzmq_version < (14, 1, 0):
+ return False
+
+ if zmq_version < (4, 0):
+ # security is new in libzmq 4.0
+ return False
+
+ try:
+ zmq.curve_keypair()
+ except zmq.error.ZMQError:
+ # security requires libzmq to be linked against libsodium
+ return False
+
+ return True
+
+
+def assert_zmq_has_curve():
+ leap_assert(zmq_has_curve, "CurveZMQ not supported!")
+
+
+def maybe_create_and_get_certificates(basedir, name):
+ """
+ Generate the needed ZMQ certificates for backend/frontend communication if
+ needed.
+ """
+ assert_zmq_has_curve()
+ private_keys_dir = os.path.join(basedir, PRIVATE_KEYS_PREFIX)
+ private_key = os.path.join(
+ private_keys_dir, name + ".key_secret")
+ if not os.path.isfile(private_key):
+ mkdir_p(private_keys_dir)
+ zmq.auth.create_certificates(private_keys_dir, name)
+ # set permissions to: 0700 (U:rwx G:--- O:---)
+ os.chmod(private_key, stat.S_IRUSR | stat.S_IWUSR)
+ # move public key to public keys directory
+ public_keys_dir = os.path.join(basedir, PUBLIC_KEYS_PREFIX)
+ old_public_key = os.path.join(
+ private_keys_dir, name + ".key")
+ new_public_key = os.path.join(
+ public_keys_dir, name + ".key")
+ mkdir_p(public_keys_dir)
+ shutil.move(old_public_key, new_public_key)
+ return zmq.auth.load_certificate(private_key)
+
+