[feat] refactor events to use ZMQ
authordrebs <drebs@leap.se>
Wed, 4 Feb 2015 17:04:10 +0000 (15:04 -0200)
committerdrebs <drebs@leap.se>
Wed, 27 May 2015 17:37:27 +0000 (14:37 -0300)
Before this commit, protobuf and protobuf.socketrpc were used to serialize and
transmit messages between events clients. This change implements a simpler ZMQ
client/server events mechanism that uses ZMQ sockets for transmitting messages
from clients to server and to redistribute such messages to subscribed
clients.

Closes: #6359

17 files changed:
changes/feature_6359_modify-events-to-use-zmq [new file with mode: 0644]
pkg/requirements.pip
setup.cfg [new file with mode: 0644]
src/leap/common/events/Makefile [deleted file]
src/leap/common/events/README.rst
src/leap/common/events/__init__.py
src/leap/common/events/catalog.py [new file with mode: 0644]
src/leap/common/events/client.py
src/leap/common/events/daemon.py [deleted file]
src/leap/common/events/errors.py [moved from src/leap/common/events/mac_auth.py with 73% similarity]
src/leap/common/events/events.proto [deleted file]
src/leap/common/events/events_pb2.py [deleted file]
src/leap/common/events/server.py
src/leap/common/events/txclient.py [new file with mode: 0644]
src/leap/common/events/zmq_components.py [new file with mode: 0644]
src/leap/common/tests/test_events.py
src/leap/common/zmq_utils.py [new file with mode: 0644]

diff --git a/changes/feature_6359_modify-events-to-use-zmq b/changes/feature_6359_modify-events-to-use-zmq
new file mode 100644 (file)
index 0000000..04ef5ae
--- /dev/null
@@ -0,0 +1 @@
+  o Modify leap.common.events to use ZMQ (closes #6359). 
index 7346fde..f875344 100644 (file)
@@ -1,10 +1,10 @@
 jsonschema  #<=0.8 -- are we done with this conflict?
 dirspec
-protobuf>=2.4.1
-protobuf.socketrpc
 pyopenssl
 python-dateutil
 Twisted>=12.1
 zope.interface
+pyzmq
+txzmq
 
 #autopep8 -- ???
diff --git a/setup.cfg b/setup.cfg
new file mode 100644 (file)
index 0000000..c71bffa
--- /dev/null
+++ b/setup.cfg
@@ -0,0 +1,2 @@
+[aliases]
+test = trial
diff --git a/src/leap/common/events/Makefile b/src/leap/common/events/Makefile
deleted file mode 100644 (file)
index 5b7e60d..0000000
+++ /dev/null
@@ -1,31 +0,0 @@
-# -*- coding: utf-8 -*-
-# Makefile
-# Copyright (C) 2013 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-# This file is used to generate protobuf python files that are used for IPC:
-#
-#   https://developers.google.com/protocol-buffers/docs/pythontutorial
-
-PROTOC = protoc
-
-all: events_pb2.py
-
-%_pb2.py: %.proto
-       $(PROTOC) --python_out=./ $<
-#      autopep8 --in-place --aggressive $@
-
-clean:
-       rm -f *_pb2.py
index 2e7f254..f455cc8 100644 (file)
@@ -1,19 +1,83 @@
 Events mechanism
 ================
 
-The events mechanism allows for clients to send signal events to each
-other by means of a centralized server. Clients can register with the
-server to receive signals of certain types, and they can also send signals to
-the server that will then redistribute these signals to registered clients.
+The events mechanism allows for clients to send events to each other by means
+of a centralized server. Clients can register with the server to receive
+events of certain types, and they can also send events to the server that will
+then redistribute these events to registered clients.
 
 
-Listening daemons
------------------
+ZMQ connections and events redistribution
+-----------------------------------------
 
-Both clients and the server listen for incoming messages by using a
-listening daemon that runs in its own thread. The server daemon has to be
-started explicitly, while clients daemon will be started whenever a
-client registers with the server to receive messages.
+Clients and server use ZMQ connection patterns to communicate. Clients can
+push events to the server, and may subscribe to events published by the
+server. The server in turn pulls events from clients and publishes them to
+subscribed clients.
+
+Clients connect to the server's zmq pub socket, and register to specific
+events indicating which callbacks should be executed when that event is
+received:
+
+
+                               EventsServer
+                              .------------.
+                              |PULL     PUB|
+                              '------------'
+                                         ^^
+                                         ||
+                            reg(1, cbk1) |'--------------. reg(2, cbk2)
+                                         |               |
+                                         |               |
+            .------------.    .------------.   .------------.
+            |PUSH     SUB|    |PUSH     SUB|   |PUSH     SUB|
+            '------------'    '------------'   '------------'
+             EventsClient      EventsClient     EventsClient
+
+
+A client that wants to send an event connects to the server's zmq pull socket
+and pushes the event to the server. The server then redistributes it to all
+clients subscribed to that event.
+
+
+                               EventsServer
+                              .------------.
+                              |PULL---->PUB|
+                              '------------'
+                               ^         |.
+                               |         |.
+sig(1, 'foo') .----------------'         |'...............
+              |                          |               .
+              |                          v               .
+            .------------.    .------------.   .------------.
+            |PUSH     SUB|    |PUSH     SUB|   |PUSH     SUB|
+            '------------'    '------------'   '------------'
+             EventsClient      EventsClient     EventsClient
+                                    |
+                                    v
+                              cbk1(1, 'foo')
+
+
+Any client may emit or subscribe to an event. ZMQ will manage sockets and
+reuse the connection whenever it can.
+
+
+                               EventsServer
+                              .------------.
+                              |PULL---->PUB|
+                              '------------'
+                                ^        .|
+                                |        .|
+sig(2, 'bar') .-----------------'        .'--------------.
+              |                          .               |
+              |                          .               v
+            .------------.    .------------.   .------------.
+            |PUSH     SUB|    |PUSH     SUB|   |PUSH     SUB|
+            '------------'    '------------'   '------------'
+             EventsClient      EventsClient     EventsClient
+                                                     |
+                                                     v
+                                               cbk2(2, 'bar')
 
 
 How to use it
@@ -22,32 +86,27 @@ How to use it
 To start the events server:
 
 >>> from leap.common.events import server
->>> server.ensure_server(port=8090)
+>>> server.ensure_server(
+        emit_addr="tcp://127.0.0.1:9000",
+        reg_addr="tcp://127.0.0.1:9001")
 
-To register a callback to be called when a given signal is raised:
+To register a callback to be called when a given event is raised:
 
->>> from leap.common.events import (
->>>     register,
->>>     events_pb2 as proto,
->>> )
+>>> from leap.common.events import register
+>>> from leap.common.events import catalog
 >>>
->>> def mycallback(sigreq):
->>>     print str(sigreq)
+>>> def mycbk(event, *content):
+>>>     print "%s, %s" (str(event),  str(content))
 >>>
->>> events.register(signal=proto.CLIENT_UID, callback=mycallback)
+>>> register(catalog.CLIENT_UID, callback=mycbk)
 
-To signal an event:
+To emit an event:
 
->>> from leap.common.events import (
->>>     signal,
->>>     events_pb2 as proto,
->>> )
->>> signal(proto.CLIENT_UID)
+>>> from leap.common.events import emit
+>>> from leap.common.events import catalog
+>>> emit(catalog.CLIENT_UID)
 
 Adding events
 -------------
 
-* Add the new event under enum ``Event`` in ``events.proto`` 
-* Compile the new protocolbuffers file::
-
-  make
+To add a new event, just add it to ``catalog.py``.
index 0cc6573..9269b9a 100644 (file)
 # You should have received a copy of the GNU General Public License
 # along with this program. If not, see <http://www.gnu.org/licenses/>.
 
+
 """
-This is an events mechanism that uses a server to allow for signaling of
-events between clients.
+This is an events mechanism that uses a server to allow for emitting events
+between clients.
 
 Application components should use the interface available in this file to
-register callbacks to be executed upon receiving specific signals, and to send
-signals to other components.
+register callbacks to be executed upon receiving specific events, and to send
+events to other components.
 
-To register a callback to be executed when a specific event is signaled, use
+To register a callback to be executed when a specific event is emitted, use
 leap.common.events.register():
 
 >>> from leap.common.events import register
->>> from leap.common.events import events_pb2 as proto
->>> register(proto.CLIENT_UID, lambda req: do_something(req))
-
-To signal an event, use leap.common.events.signal():
-
->>> from leap.common.events import signal
->>> from leap.common.events import events_pb2 as proto
->>> signal(proto.CLIENT_UID)
-
-
-NOTE ABOUT SYNC/ASYNC REQUESTS:
-
-Clients always communicate with the server, and never between themselves.
-When a client registers a callback for an event, the callback is saved locally
-in the client and the server stores the client socket port in a list
-associated with that event. When a client signals an event, the server
-forwards the signal to all registered client ports, and then each client
-executes its callbacks associated with that event locally.
-
-Each RPC call from a client to the server is followed by a response from the
-server to the client. Calls to register() and signal() (and all other RPC
-calls) can be synchronous or asynchronous meaning if they will or not wait for
-the server's response before returning.
-
-This mechanism was built on top of protobuf.socketrpc, and because of this RPC
-calls are made synchronous or asynchronous in the following way:
-
-  * If RPC calls receive a parameter called `reqcbk`, then the call is made
-    asynchronous. That means that:
+>>> from leap.common.events import catalog
+>>> register(catalog.CLIENT_UID, lambda sig, content: do_something(content))
 
-        - an eventual `timeout` parameter is not used,
-        - the call returns immediatelly with value None, and
-        - the `reqcbk` callback is executed asynchronously upon the arrival of
-          a response from the server.
+To emit an event, use leap.common.events.emit():
 
-  * Otherwise, if the `reqcbk` parameter is None, then the call is made in a
-    synchronous manner:
-
-        - if a response from server arrives within `timeout` milliseconds, the
-          RPC call returns it;
-        - otherwise, the call returns None.
+>>> from leap.common.events import emit
+>>> from leap.common.events import catalog
+>>> emit(catalog.CLIENT_UID)
 """
 
+
 import logging
-import socket
+import argparse
+
+from leap.common.events import client
+from leap.common.events import server
+from leap.common.events import catalog
 
 
-from leap.common.events import (
-    events_pb2 as proto,
-    server,
-    client,
-    daemon,
-)
+__all__ = [
+    "register",
+    "unregister",
+    "emit",
+    "catalog",
+]
 
 
 logger = logging.getLogger(__name__)
 
 
-def register(signal, callback, uid=None, replace=False, reqcbk=None,
-             timeout=1000):
+def register(event, callback, uid=None, replace=False):
     """
-    Register a callback to be called when the given signal is received.
-
-    Will timeout after timeout ms if response has not been received. The
-    timeout arg is only used for asynch requests. If a reqcbk callback has
-    been supplied the timeout arg is not used. The response value will be
-    returned for a synch request but nothing will be returned for an asynch
-    request.
-
-    :param signal: the signal that causes the callback to be launched
-    :type signal: int (see the `events.proto` file)
-    :param callback: the callback to be called when the signal is received
-    :type callback: function
-    :param uid: a unique id for the callback
-    :type uid: int
-    :param replace: should an existent callback with same uid be replaced?
+    Register a callback to be executed when an event is received.
+
+    :param event: The event that triggers the callback.
+    :type event: str
+    :param callback: The callback to be executed.
+    :type callback: callable(event, content)
+    :param uid: The callback uid.
+    :type uid: str
+    :param replace: Wether an eventual callback with same ID should be
+                    replaced.
     :type replace: bool
-    :param reqcbk: a callback to be called when a response from server is
-                   received
-    :type reqcbk: function(leap.common.events.events_pb2.EventResponse)
-    :param timeout: the timeout for synch calls
-    :type timeout: int
-
-    :return: the response from server for synch calls or nothing for asynch
-        calls.
-    :rtype: leap.common.events.events_pb2.EventsResponse or None
-    """
-    return client.register(signal, callback, uid, replace, reqcbk, timeout)
 
+    :return: The callback uid.
+    :rtype: str
 
-def unregister(signal, uid=None, reqcbk=None, timeout=1000):
-    """
-    Unregister a callback.
-
-    If C{uid} is specified, unregisters only the callback identified by that
-    unique id. Otherwise, unregisters all callbacks registered for C{signal}.
-
-    :param signal: the signal that causes the callback to be launched
-    :type signal: int (see the `events.proto` file)
-    :param uid: a unique id for the callback
-    :type uid: int
-    :param reqcbk: a callback to be called when a response from server is
-                   received
-    :type reqcbk: function(proto.UnregisterRequest, proto.EventResponse)
-    :param timeout: the timeout for synch calls
-    :type timeout: int
-
-    :return: the response from server for synch calls or nothing for asynch
-             calls.
-    :rtype: leap.common.events.events_pb2.EventsResponse or None
+    :raises CallbackAlreadyRegistered: when there's already a callback
+            identified by the given uid and replace is False.
     """
-    return client.unregister(signal, uid, reqcbk, timeout)
+    return client.register(event, callback, uid, replace)
 
 
-def signal(signal, content="", mac_method="", mac="", reqcbk=None,
-           timeout=1000):
+def unregister(event, uid=None):
     """
-    Send `signal` event to events server.
-
-    Will timeout after timeout ms if response has not been received. The
-    timeout arg is only used for asynch requests.  If a reqcbk callback has
-    been supplied the timeout arg is not used. The response value will be
-    returned for a synch request but nothing will be returned for an asynch
-    request.
-
-    :param signal: the signal that causes the callback to be launched
-    :type signal: int (see the `events.proto` file)
-    :param content: the contents of the event signal
-    :type content: str
-    :param mac_method: the method used to auth mac
-    :type mac_method: str
-    :param mac: the content of the auth mac
-    :type mac: str
-    :param reqcbk: a callback to be called when a response from server is
-                   received
-    :type reqcbk: function(proto.SignalRequest, proto.EventResponse)
-    :param timeout: the timeout for synch calls
-    :type timeout: int
-
-    :return: the response from server for synch calls or nothing for asynch
-             calls.
-    :rtype: leap.common.events.events_pb2.EventsResponse or None
-    """
-    return client.signal(signal, content, mac_method, mac, reqcbk, timeout)
+    Unregister callbacks for an event.
 
-def ping_client(port, reqcbk=None, timeout=1000):
-    """
-    Ping a client running in C{port}.
-
-    :param port: the port in which the client should be listening
-    :type port: int
-    :param reqcbk: a callback to be called when a response from client is
-                   received
-    :type reqcbk: function(proto.PingRequest, proto.EventResponse)
-    :param timeout: the timeout for synch calls
-    :type timeout: int
+    If uid is not None, then only the callback identified by the given uid is
+    removed. Otherwise, all callbacks for the event are removed.
+
+    :param event: The event that triggers the callback.
+    :type event: Event
+    :param uid: The callback uid.
+    :type uid: str
     """
-    return client.ping(port, reqcbk=reqcbk, timeout=timeout)
+    return client.unregister(event, uid)
 
 
-def ping_server(port=server.SERVER_PORT, reqcbk=None, timeout=1000):
+def emit(event, *content):
     """
-    Ping the server.
-
-    :param port: the port in which server should be listening
-    :type port: int
-    :param reqcbk: a callback to be called when a response from server is
-                   received
-    :type reqcbk: function(proto.PingRequest, proto.EventResponse)
-    :param timeout: the timeout for synch calls
-    :type timeout: int
+    Send an event.
+
+    :param event: The event to be sent.
+    :type event: Event
+    :param content: The content of the event.
+    :type content: list
     """
-    return server.ping(port, reqcbk=reqcbk, timeout=timeout)
+    return client.emit(event, *content)
+
+
+if __name__ == "__main__":
+
+    def _echo(event, *content):
+        print "Received event: (%s, %s)" % (event, content)
+
+    def _parse_args():
+        parser = argparse.ArgumentParser()
+        parser.add_argument(
+            "--debug", "-d", action="store_true",
+            help="print debug information")
+
+        subparsers = parser.add_subparsers(dest="command")
+
+        # server options
+        server_parser = subparsers.add_parser(
+            "server", help="Run an events server.")
+        server_parser.add_argument(
+            "--emit-addr",
+            help="The address in which to listen for events",
+            default=server.EMIT_ADDR)
+        server_parser.add_argument(
+            "--reg-addr",
+            help="The address in which to listen for registration for events.",
+            default=server.REG_ADDR)
+
+        # client options
+        client_parser = subparsers.add_parser(
+            "client", help="Run an events client.")
+        client_parser.add_argument(
+            "--emit-addr",
+            help="The address in which to emit events.",
+            default=server.EMIT_ADDR)
+        client_parser.add_argument(
+            "--reg-addr",
+            help="The address in which to register for events.",
+            default=server.REG_ADDR)
+        group = client_parser.add_mutually_exclusive_group(required=True)
+        group.add_argument('--reg', help="register an event")
+        group.add_argument('--emit', help="send an event")
+        client_parser.add_argument(
+            '--content', help="the content of the event", default=None)
+
+        # txclient options
+        txclient_parser = subparsers.add_parser(
+            "txclient", help="Run an events twisted client.")
+        txclient_parser.add_argument(
+            "--emit-addr",
+            help="The address in which to emit events.",
+            default=server.EMIT_ADDR)
+        txclient_parser.add_argument(
+            "--reg-addr",
+            help="The address in which to register for events.",
+            default=server.REG_ADDR)
+        group = txclient_parser.add_mutually_exclusive_group(required=True)
+        group.add_argument('--reg', help="register an event")
+        group.add_argument('--emit', help="send an event")
+        txclient_parser.add_argument(
+            '--content', help="the content of the event", default=None)
+
+        return parser.parse_args()
+
+    args = _parse_args()
+
+    if args.debug:
+        logging.basicConfig(level=logging.DEBUG)
+
+    if args.command == "server":
+        # run server
+        server.ensure_server(emit_addr=args.emit_addr, reg_addr=args.reg_addr)
+        from twisted.internet import reactor
+        reactor.run()
+    elif args.command == "client":
+        if args.reg:
+            event = getattr(catalog, args.reg)
+            # run client and register to a signal
+            register(event, _echo)
+            # make sure we stop on CTRL+C
+            import signal
+            signal.signal(
+                signal.SIGINT, lambda sig, frame: client.shutdown())
+            # wait until client thread dies
+            import time
+            while client.EventsClientThread.instance().is_alive():
+                time.sleep(0.1)
+        if args.emit:
+            # run client and emit a signal
+            event = getattr(catalog, args.emit)
+            emit(event, args.content)
+            client.shutdown()
+    elif args.command == "txclient":
+        from leap.common.events import txclient
+        register = txclient.register
+        emit = txclient.emit
+        if args.reg:
+            event = getattr(catalog, args.reg)
+            # run client and register to a signal
+            register(event, _echo)
+            from twisted.internet import reactor
+            reactor.run()
+        if args.emit:
+            # run client and emit a signal
+            event = getattr(catalog, args.emit)
+            emit(event, args.content)
diff --git a/src/leap/common/events/catalog.py b/src/leap/common/events/catalog.py
new file mode 100644 (file)
index 0000000..8bddd2c
--- /dev/null
@@ -0,0 +1,88 @@
+# -*- coding: utf-8 -*-
+# catalog.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful",
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+Events catalog.
+"""
+
+
+EVENTS = [
+    "CLIENT_SESSION_ID",
+    "CLIENT_UID",
+    "IMAP_CLIENT_LOGIN",
+    "IMAP_SERVICE_FAILED_TO_START",
+    "IMAP_SERVICE_STARTED",
+    "IMAP_UNHANDLED_ERROR",
+    "KEYMANAGER_DONE_UPLOADING_KEYS",
+    "KEYMANAGER_FINISHED_KEY_GENERATION",
+    "KEYMANAGER_KEY_FOUND",
+    "KEYMANAGER_KEY_NOT_FOUND",
+    "KEYMANAGER_LOOKING_FOR_KEY",
+    "KEYMANAGER_STARTED_KEY_GENERATION",
+    "MAIL_FETCHED_INCOMING",
+    "MAIL_MSG_DECRYPTED",
+    "MAIL_MSG_DELETED_INCOMING",
+    "MAIL_MSG_PROCESSING",
+    "MAIL_MSG_SAVED_LOCALLY",
+    "MAIL_UNREAD_MESSAGES",
+    "RAISE_WINDOW",
+    "SMTP_CONNECTION_LOST",
+    "SMTP_END_ENCRYPT_AND_SIGN",
+    "SMTP_END_SIGN",
+    "SMTP_RECIPIENT_ACCEPTED_ENCRYPTED",
+    "SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED",
+    "SMTP_RECIPIENT_REJECTED",
+    "SMTP_SEND_MESSAGE_ERROR",
+    "SMTP_SEND_MESSAGE_START",
+    "SMTP_SEND_MESSAGE_SUCCESS",
+    "SMTP_SERVICE_FAILED_TO_START",
+    "SMTP_SERVICE_STARTED",
+    "SMTP_START_ENCRYPT_AND_SIGN",
+    "SMTP_START_SIGN",
+    "SOLEDAD_CREATING_KEYS",
+    "SOLEDAD_DONE_CREATING_KEYS",
+    "SOLEDAD_DONE_DATA_SYNC",
+    "SOLEDAD_DONE_DOWNLOADING_KEYS",
+    "SOLEDAD_DONE_UPLOADING_KEYS",
+    "SOLEDAD_DOWNLOADING_KEYS",
+    "SOLEDAD_INVALID_AUTH_TOKEN",
+    "SOLEDAD_NEW_DATA_TO_SYNC",
+    "SOLEDAD_SYNC_RECEIVE_STATUS",
+    "SOLEDAD_SYNC_SEND_STATUS",
+    "SOLEDAD_UPLOADING_KEYS",
+    "UPDATER_DONE_UPDATING",
+    "UPDATER_NEW_UPDATES",
+]
+
+
+class Event(object):
+
+    def __init__(self, label):
+        self.label = label
+
+    def __repr__(self):
+        return '<Event: %s>' % self.label
+
+    def __str__(self):
+        return self.label
+
+
+# create local variables based on the event list above
+lcl = locals()
+for event in EVENTS:
+    lcl[event] = Event(event)
index 83f18e0..6b234a1 100644 (file)
@@ -1,6 +1,6 @@
 # -*- coding: utf-8 -*-
 # client.py
-# Copyright (C) 2013 LEAP
+# Copyright (C) 2013, 2014, 2015 LEAP
 #
 # This program is free software: you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -14,6 +14,8 @@
 #
 # You should have received a copy of the GNU General Public License
 # along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
 """
 The client end point of the events mechanism.
 
@@ -21,309 +23,501 @@ Clients are the communicating parties of the events mechanism. They
 communicate by sending messages to a server, which in turn redistributes
 messages to other clients.
 
-When a client registers a callback for a given signal, it also tells the
-server that it wants to be notified whenever signals of that type are sent by
+When a client registers a callback for a given event, it also tells the
+server that it wants to be notified whenever events of that type are sent by
 some other client.
 """
 
-import logging
 
+import logging
+import collections
+import uuid
+import threading
+import time
+import pickle
+import os
+
+from abc import ABCMeta
+from abc import abstractmethod
+
+import zmq
+from zmq.eventloop import zmqstream
+from zmq.eventloop import ioloop
+
+# XXX some distros don't package libsodium, so we have to be prepared for
+#     absence of zmq.auth
+try:
+    import zmq.auth
+except ImportError:
+    pass
 
-from protobuf.socketrpc import RpcService
+from leap.common.config import get_path_prefix
+from leap.common.zmq_utils import zmq_has_curve
+from leap.common.zmq_utils import maybe_create_and_get_certificates
+from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX
 
-from leap.common.events import events_pb2 as proto
-from leap.common.events import server
-from leap.common.events import daemon
-from leap.common.events import mac_auth
+from leap.common.events.errors import CallbackAlreadyRegisteredError
+from leap.common.events.server import EMIT_ADDR
+from leap.common.events.server import REG_ADDR
+from leap.common.events import catalog
 
 
 logger = logging.getLogger(__name__)
 
 
-# the `registered_callbacks` dictionary below should have the following
-# format:
-#
-#     { event_signal: [ (uid, callback), ... ], ... }
-#
-registered_callbacks = {}
+_emit_addr = EMIT_ADDR
+_reg_addr = REG_ADDR
+
 
+def configure_client(emit_addr, reg_addr):
+    global _emit_addr, _reg_addr
+    logger.debug("Configuring client with addresses: (%s, %s)" %
+                 (emit_addr, reg_addr))
+    _emit_addr = emit_addr
+    _reg_addr = reg_addr
 
-class CallbackAlreadyRegistered(Exception):
+
+class EventsClient(object):
     """
-    Raised when trying to register an already registered callback.
+    A singleton client for the events mechanism.
     """
-    pass
 
+    __metaclass__ = ABCMeta
 
-def ensure_client_daemon():
-    """
-    Ensure the client daemon is running and listening for incoming
-    messages.
+    _instance = None
+    _instance_lock = threading.Lock()
 
-    :return: the daemon instance
-    :rtype: EventsClientDaemon
-    """
-    import time
-    daemon = EventsClientDaemon.ensure(0)
-    logger.debug('ensure client daemon')
+    def __init__(self, emit_addr, reg_addr):
+        """
+        Initialize the events client.
+        """
+        logger.debug("Creating client instance.")
+        self._callbacks = collections.defaultdict(dict)
+        self._emit_addr = emit_addr
+        self._reg_addr = reg_addr
+
+    @property
+    def callbacks(self):
+        return self._callbacks
 
-    # Because we use a random port we want to wait until a port is assigned to
-    # local client daemon.
+    @classmethod
+    def instance(cls):
+        """
+        Return a singleton EventsClient instance.
+        """
+        with cls._instance_lock:
+            if cls._instance is None:
+                cls._instance = cls(_emit_addr, _reg_addr)
+        return cls._instance
 
-    while not (EventsClientDaemon.get_instance() and
-               EventsClientDaemon.get_instance().get_port()):
-        time.sleep(0.1)
-    return daemon
+    def register(self, event, callback, uid=None, replace=False):
+        """
+        Register a callback to be executed when an event is received.
+
+        :param event: The event that triggers the callback.
+        :type event: Event
+        :param callback: The callback to be executed.
+        :type callback: callable(event, *content)
+        :param uid: The callback uid.
+        :type uid: str
+        :param replace: Wether an eventual callback with same ID should be
+                        replaced.
+        :type replace: bool
+
+        :return: The callback uid.
+        :rtype: str
+
+        :raises CallbackAlreadyRegisteredError: when there's already a callback
+                identified by the given uid and replace is False.
+        """
+        logger.debug("Subscribing to event: %s" % event)
+        if not uid:
+            uid = uuid.uuid4()
+        elif uid in self._callbacks[event] and not replace:
+            raise CallbackAlreadyRegisteredError()
+        self._callbacks[event][uid] = callback
+        self._subscribe(str(event))
+        return uid
+
+    def unregister(self, event, uid=None):
+        """
+        Unregister callbacks for an event.
 
+        If uid is not None, then only the callback identified by the given uid
+        is removed. Otherwise, all callbacks for the event are removed.
 
-def register(signal, callback, uid=None, replace=False, reqcbk=None,
-             timeout=1000):
-    """
-    Registers a callback to be called when a specific signal event is
-    received.
-
-    Will timeout after timeout ms if response has not been received. The
-    timeout arg is only used for asynch requests. If a reqcbk callback has
-    been supplied the timeout arg is not used. The response value will be
-    returned for a synch request but nothing will be returned for an asynch
-    request.
-
-    :param signal: the signal that causes the callback to be launched
-    :type signal: int (see the `events.proto` file)
-    :param callback: the callback to be called when the signal is received
-    :type callback: function(leap.common.events.events_pb2.SignalRequest)
-    :param uid: a unique id for the callback
-    :type uid: int
-    :param replace: should an existent callback with same uid be replaced?
-    :type replace: bool
-    :param reqcbk: a callback to be called when a response from server is
-                   received
-    :type reqcbk: function(proto.RegisterRequest, proto.EventResponse)
-    :param timeout: the timeout for synch calls
-    :type timeout: int
-
-    Might raise a CallbackAlreadyRegistered exception if there's already a
-    callback identified by the given uid and replace is False.
-
-    :return: the response from server for synch calls or nothing for asynch
-             calls.
-    :rtype: leap.common.events.events_pb2.EventsResponse or None
-    """
-    ensure_client_daemon()  # so we can receive registered signals
-    # register callback locally
-    if signal not in registered_callbacks:
-        registered_callbacks[signal] = []
-    cbklist = registered_callbacks[signal]
-
-    # TODO should check that the callback has the right
-    # number of arguments.
-
-    if uid and filter(lambda (x, y): x == uid, cbklist):
-        if not replace:
-            raise CallbackAlreadyRegistered()
+        :param event: The event that triggers the callback.
+        :type event: Event
+        :param uid: The callback uid.
+        :type uid: str
+        """
+        if not uid:
+            logger.debug(
+                "Unregistering all callbacks from event %s." % event)
+            self._callbacks[event] = {}
         else:
-            registered_callbacks[signal] = filter(lambda(x, y): x != uid,
-                                                  cbklist)
-    registered_callbacks[signal].append((uid, callback))
-    # register callback on server
-    request = proto.RegisterRequest()
-    request.event = signal
-    request.port = EventsClientDaemon.get_instance().get_port()
-    request.mac_method = mac_auth.MacMethod.MAC_NONE
-    request.mac = ""
-    service = RpcService(proto.EventsServerService_Stub,
-                         server.SERVER_PORT, 'localhost')
-    logger.debug(
-        "Sending registration request to server on port %s: %s",
-        server.SERVER_PORT,
-        str(request)[:40])
-    return service.register(request, callback=reqcbk, timeout=timeout)
-
-
-def unregister(signal, uid=None, reqcbk=None, timeout=1000):
-    """
-    Unregister a callback.
-
-    If C{uid} is specified, unregisters only the callback identified by that
-    unique id. Otherwise, unregisters all callbacks
-
-    :param signal: the signal that causes the callback to be launched
-    :type signal: int (see the `events.proto` file)
-    :param uid: a unique id for the callback
-    :type uid: int
-    :param reqcbk: a callback to be called when a response from server is
-                   received
-    :type reqcbk: function(proto.UnregisterRequest, proto.EventResponse)
-    :param timeout: the timeout for synch calls
-    :type timeout: int
-
-    :return: the response from server for synch calls or nothing for asynch
-             calls or None if no callback is registered for that signal or
-             uid.
-    :rtype: leap.common.events.events_pb2.EventsResponse or None
-    """
-    if signal not in registered_callbacks or not registered_callbacks[signal]:
-        logger.warning("No callback registered for signal %d." % signal)
-        return None
-    # unregister callback locally
-    cbklist = registered_callbacks[signal]
-    if uid is not None:
-        if filter(lambda (cbkuid, _): cbkuid == uid, cbklist) == []:
-            logger.warning("No callback registered for uid %d." % st)
-            return None
-        registered_callbacks[signal] = filter(lambda(x, y): x != uid, cbklist)
-    else:
-        # exclude all callbacks for given signal
-        registered_callbacks[signal] = []
-    # unregister port in server if there are no more callbacks for this signal
-    if not registered_callbacks[signal]:
-        request = proto.UnregisterRequest()
-        request.event = signal
-        request.port = EventsClientDaemon.get_instance().get_port()
-        request.mac_method = mac_auth.MacMethod.MAC_NONE
-        request.mac = ""
-        service = RpcService(proto.EventsServerService_Stub,
-                             server.SERVER_PORT, 'localhost')
-        logger.info(
-            "Sending unregistration request to server on port %s: %s",
-            server.SERVER_PORT,
-            str(request)[:40])
-        return service.unregister(request, callback=reqcbk, timeout=timeout)
-
-
-def signal(signal, content="", mac_method="", mac="", reqcbk=None,
-           timeout=1000):
-    """
-    Send `signal` event to events server.
-
-    Will timeout after timeout ms if response has not been received. The
-    timeout arg is only used for asynch requests.  If a reqcbk callback has
-    been supplied the timeout arg is not used. The response value will be
-    returned for a synch request but nothing will be returned for an asynch
-    request.
-
-    :param signal: the signal that causes the callback to be launched
-    :type signal: int (see the `events.proto` file)
-    :param content: the contents of the event signal
-    :type content: str
-    :param mac_method: the method used for auth mac
-    :type mac_method: str
-    :param mac: the content of the auth mac
-    :type mac: str
-    :param reqcbk: a callback to be called when a response from server is
-                   received
-    :type reqcbk: function(proto.SignalRequest, proto.EventResponse)
-    :param timeout: the timeout for synch calls
-    :type timeout: int
-
-    :return: the response from server for synch calls or nothing for asynch
-             calls.
-    :rtype: leap.common.events.events_pb2.EventsResponse or None
-    """
-    request = proto.SignalRequest()
-    request.event = signal
-    request.content = content
-    request.mac_method = mac_method
-    request.mac = mac
-    service = RpcService(proto.EventsServerService_Stub, server.SERVER_PORT,
-                         'localhost')
-    logger.debug("Sending signal to server: %s", str(request)[:40])
-    return service.signal(request, callback=reqcbk, timeout=timeout)
-
-
-def ping(port, reqcbk=None, timeout=1000):
+            logger.debug(
+                "Unregistering callback %s from event %s." % (uid, event))
+            if uid in self._callbacks[event]:
+                del self._callbacks[event][uid]
+        if not self._callbacks[event]:
+            del self._callbacks[event]
+            self._unsubscribe(str(event))
+
+    def emit(self, event, *content):
+        """
+        Send an event.
+
+        :param event: The event to be sent.
+        :type event: Event
+        :param content: The content of the event.
+        :type content: list
+        """
+        logger.debug("Sending event: (%s, %s)" % (event, content))
+        self._send(str(event) + b'\0' + pickle.dumps(content))
+
+    def _handle_event(self, event, content):
+        """
+        Handle an incoming event.
+
+        :param msg: The incoming message.
+        :type msg: list(str)
+        """
+        logger.debug("Handling event %s..." % event)
+        for uid in self._callbacks[event]:
+            callback = self._callbacks[event][uid]
+            logger.debug("Executing callback %s." % uid)
+            callback(event, *content)
+
+    @abstractmethod
+    def _subscribe(self, tag):
+        """
+        Subscribe to a tag on the zmq SUB socket.
+
+        :param tag: The tag to be subscribed.
+        :type tag: str
+        """
+        pass
+
+    @abstractmethod
+    def _unsubscribe(self, tag):
+        """
+        Unsubscribe from a tag on the zmq SUB socket.
+
+        :param tag: The tag to be unsubscribed.
+        :type tag: str
+        """
+        pass
+
+    @abstractmethod
+    def _send(self, data):
+        """
+        Send data through PUSH socket.
+
+        :param data: The data to be sent.
+        :type event: str
+        """
+        pass
+
+    def shutdown(self):
+        self.__class__.reset()
+
+    @classmethod
+    def reset(cls):
+        with cls._instance_lock:
+            cls._instance = None
+
+
+class EventsIOLoop(ioloop.ZMQIOLoop):
     """
-    Ping a client running in C{port}.
-
-    :param port: the port in which the client should be listening
-    :type port: int
-    :param reqcbk: a callback to be called when a response from client is
-                   received
-    :type reqcbk: function(proto.PingRequest, proto.EventResponse)
-    :param timeout: the timeout for synch calls
-    :type timeout: int
-
-    :return: the response from client for synch calls or nothing for asynch
-             calls.
-    :rtype: leap.common.events.events_pb2.EventsResponse or None
+    An extension of zmq's ioloop that can wait until there are no callbacks
+    in the queue before stopping.
     """
-    request = proto.PingRequest()
-    service = RpcService(
-        proto.EventsClientService_Stub,
-        port,
-        'localhost')
-    logger.debug("Pinging a client in port %d..." % port)
-    return service.ping(request, callback=reqcbk, timeout=timeout)
 
+    def stop(self, wait=False):
+        """
+        Stop the I/O loop.
 
-class EventsClientService(proto.EventsClientService):
+        :param wait: Whether we should wait for callbacks in queue to finish
+                     before stopping.
+        :type wait: bool
+        """
+        if wait:
+            # prevent new callbacks from being added
+            with self._callback_lock:
+                self._closing = True
+            # wait until all callbacks have been executed
+            while self._callbacks:
+                time.sleep(0.1)
+        ioloop.ZMQIOLoop.stop(self)
+
+
+class EventsClientThread(threading.Thread, EventsClient):
     """
-    Service for receiving signal events in clients.
+    A threaded version of the events client.
     """
 
-    def __init__(self):
-        proto.EventsClientService.__init__(self)
+    def __init__(self, emit_addr, reg_addr):
+        """
+        Initialize the events client.
+        """
+        threading.Thread.__init__(self)
+        EventsClient.__init__(self, emit_addr, reg_addr)
+        self._lock = threading.Lock()
+        self._initialized = threading.Event()
+        self._config_prefix = os.path.join(
+            get_path_prefix(), "leap", "events")
+        self._loop = None
+        self._context = None
+        self._push = None
+        self._sub = None
+
+    def _init_zmq(self):
+        """
+        Initialize ZMQ connections.
+        """
+        self._loop = EventsIOLoop()
+        self._context = zmq.Context()
+        # connect SUB first, otherwise we might miss some event sent from this
+        # same client
+        self._sub = self._zmq_connect_sub()
+        self._push = self._zmq_connect_push()
+
+    def _zmq_connect(self, socktype, address):
+        """
+        Connect to an address using with a zmq socktype.
+
+        :param socktype: The ZMQ socket type.
+        :type socktype: int
+        :param address: The address to connect to.
+        :type address: str
 
-    def signal(self, controller, request, done):
+        :return: A ZMQ connection stream.
+        :rtype: ZMQStream
         """
-        Receive a signal and run callbacks registered for that signal.
+        logger.debug("Connecting %s to %s." % (socktype, address))
+        socket = self._context.socket(socktype)
+        # configure curve authentication
+        if zmq_has_curve():
+            public, private = maybe_create_and_get_certificates(
+                self._config_prefix, "client")
+            server_public_file = os.path.join(
+                self._config_prefix, PUBLIC_KEYS_PREFIX, "server.key")
+            server_public, _ = zmq.auth.load_certificate(server_public_file)
+            socket.curve_publickey = public
+            socket.curve_secretkey = private
+            socket.curve_serverkey = server_public
+        stream = zmqstream.ZMQStream(socket, self._loop)
+        socket.connect(address)
+        return stream
+
+    def _zmq_connect_push(self):
+        """
+        Initialize the client's PUSH connection.
 
-        This method is called whenever a signal request is received from
-        server.
+        :return: A ZMQ connection stream.
+        :rtype: ZMQStream
+        """
+        return self._zmq_connect(zmq.PUSH, self._emit_addr)
 
-        :param controller: used to mediate a single method call
-        :type controller: protobuf.socketrpc.controller.SocketRpcController
-        :param request: the request received from the client
-        :type request: leap.common.events.events_pb2.SignalRequest
-        :param done: callback to be called when done
-        :type done: protobuf.socketrpc.server.Callback
+    def _zmq_connect_sub(self):
         """
-        logger.debug('Received signal from server: %s...' % str(request)[:40])
+        Initialize the client's SUB connection.
 
-        # run registered callbacks
-        # TODO: verify authentication using mac in incoming message
-        if request.event in registered_callbacks:
-            for (_, cbk) in registered_callbacks[request.event]:
-                # callbacks should be prepared to receive a
-                # events_pb2.SignalRequest.
-                cbk(request)
+        :return: A ZMQ connection stream.
+        :rtype: ZMQStream
+        """
+        stream = self._zmq_connect(zmq.SUB, self._reg_addr)
+        stream.on_recv(self._on_recv)
+        return stream
 
-        # send response back to server
-        response = proto.EventResponse()
-        response.status = proto.EventResponse.OK
-        done.run(response)
+    def _on_recv(self, msg):
+        """
+        Handle an incoming message in the SUB socket.
 
-    def ping(self, controller, request, done):
+        :param msg: The received message.
+        :type msg: str
         """
-        Reply to a ping request.
+        ev_str, content_pickle = msg[0].split(b'\0', 1)  # undo txzmq tagging
+        event = getattr(catalog, ev_str)
+        content = pickle.loads(content_pickle)
+        self._handle_event(event, content)
 
-        :param controller: used to mediate a single method call
-        :type controller: protobuf.socketrpc.controller.SocketRpcController
-        :param request: the request received from the client
-        :type request: leap.common.events.events_pb2.RegisterRequest
-        :param done: callback to be called when done
-        :type done: protobuf.socketrpc.server.Callback
+    def _subscribe(self, tag):
         """
-        logger.debug("Received ping request, sending response.")
-        response = proto.EventResponse()
-        response.status = proto.EventResponse.OK
-        done.run(response)
+        Subscribe from a tag on the zmq SUB socket.
 
+        :param tag: The tag to be subscribed.
+        :type tag: str
+        """
+        self._sub.socket.setsockopt(zmq.SUBSCRIBE, tag)
 
-class EventsClientDaemon(daemon.EventsSingletonDaemon):
-    """
-    A daemon that listens for incoming events from server.
-    """
 
-    @classmethod
-    def ensure(cls, port):
+    def _unsubscribe(self, tag):
+        """
+        Unsubscribe from a tag on the zmq SUB socket.
+
+        :param tag: The tag to be unsubscribed.
+        :type tag: str
+        """
+        self._sub.socket.setsockopt(zmq.UNSUBSCRIBE, tag)
+
+    def _send(self, data):
+        """
+        Send data through PUSH socket.
+
+        :param data: The data to be sent.
+        :type event: str
+        """
+        logger.debug("Sending data: %s" % data)
+        # add send() as a callback for ioloop so it works between threads
+        self._loop.add_callback(lambda: self._push.send(data))
+
+    def register(self, event, callback, uid=None, replace=False):
+        """
+        Register a callback to be executed when an event is received.
+
+        :param event: The event that triggers the callback.
+        :type event: Event
+        :param callback: The callback to be executed.
+        :type callback: callable(event, *content)
+        :param uid: The callback uid.
+        :type uid: str
+        :param replace: Wether an eventual callback with same ID should be
+                        replaced.
+        :type replace: bool
+
+        :return: The callback uid.
+        :rtype: str
+
+        :raises CallbackAlreadyRegisteredError: when there's already a
+                callback identified by the given uid and replace is False.
         """
-        Make sure the daemon is running on the given port.
+        self.ensure_client()
+        return EventsClient.register(self, event, callback, uid=uid, replace=replace)
 
-        :param port: the port in which the daemon should listen
-        :type port: int
+    def unregister(self, event, uid=None):
+        """
+        Unregister callbacks for an event.
+
+        If uid is not None, then only the callback identified by the given uid
+        is removed. Otherwise, all callbacks for the event are removed.
+
+        :param event: The event that triggers the callback.
+        :type event: Event
+        :param uid: The callback uid.
+        :type uid: str
+        """
+        self.ensure_client()
+        EventsClient.unregister(self, event, uid=uid)
+
+    def emit(self, event, *content):
+        """
+        Send an event.
+
+        :param event: The event to be sent.
+        :type event: Event
+        :param content: The content of the event.
+        :type content: list
+        """
+        self.ensure_client()
+        EventsClient.emit(self, event, *content)
+
+    def run(self):
+        """
+        Run the events client.
+        """
+        logger.debug("Starting ioloop.")
+        self._init_zmq()
+        self._initialized.set()
+        self._loop.start()
+        self._loop.close()
+        logger.debug("Ioloop finished.")
+
+    def ensure_client(self):
+        """
+        Make sure the events client thread is started.
+        """
+        with self._lock:
+            if not self.is_alive():
+                self.daemon = True
+                self.start()
+                self._initialized.wait()
 
-        :return: a daemon instance
-        :rtype: EventsClientDaemon
+    def shutdown(self):
         """
-        return cls.ensure_service(port, EventsClientService())
+        Shutdown the events client thread.
+        """
+        logger.debug("Shutting down client...")
+        with self._lock:
+            if self.is_alive():
+                self._loop.stop(wait=True)
+        EventsClient.shutdown(self)
+
+
+def shutdown():
+    """
+    Shutdown the events client thread.
+    """
+    EventsClientThread.instance().shutdown()
+
+
+def register(event, callback, uid=None, replace=False):
+    """
+    Register a callback to be executed when an event is received.
+
+    :param event: The event that triggers the callback.
+    :type event: str
+    :param callback: The callback to be executed.
+    :type callback: callable(event, content)
+    :param uid: The callback uid.
+    :type uid: str
+    :param replace: Wether an eventual callback with same ID should be
+                    replaced.
+    :type replace: bool
+
+    :return: The callback uid.
+    :rtype: str
+
+    :raises CallbackAlreadyRegisteredError: when there's already a callback
+            identified by the given uid and replace is False.
+    """
+    return EventsClientThread.instance().register(
+        event, callback, uid=uid, replace=replace)
+
+
+def unregister(event, uid=None):
+    """
+    Unregister callbacks for an event.
+
+    If uid is not None, then only the callback identified by the given uid is
+    removed. Otherwise, all callbacks for the event are removed.
+
+    :param event: The event that triggers the callback.
+    :type event: str
+    :param uid: The callback uid.
+    :type uid: str
+    """
+    return EventsClientThread.instance().unregister(event, uid=uid)
+
+
+def emit(event, *content):
+    """
+    Send an event.
+
+    :param event: The event to be sent.
+    :type event: str
+    :param content: The content of the event.
+    :type content: list
+    """
+    return EventsClientThread.instance().emit(event, *content)
+
+
+def instance():
+    """
+    Return an instance of the events client.
+
+    :return: An instance of the events client.
+    :rtype: EventsClientThread
+    """
+    return EventsClientThread.instance()
diff --git a/src/leap/common/events/daemon.py b/src/leap/common/events/daemon.py
deleted file mode 100644 (file)
index c4a4189..0000000
+++ /dev/null
@@ -1,208 +0,0 @@
-# -*- coding: utf-8 -*-
-# daemon.py
-# Copyright (C) 2013 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-A singleton daemon for running RPC services using protobuf.socketrpc.
-"""
-
-
-import logging
-import threading
-
-
-from protobuf.socketrpc.server import (
-    SocketRpcServer,
-    ThreadedTCPServer,
-    SocketHandler,
-)
-
-
-logger = logging.getLogger(__name__)
-
-
-class ServiceAlreadyRunningException(Exception):
-    """
-    Raised whenever a service is already running in this process but someone
-    attemped to start it in a different port.
-    """
-
-
-class EventsRpcServer(SocketRpcServer):
-    """
-    RPC server used in server and client interfaces to receive messages.
-    """
-
-    def __init__(self, port, host='localhost'):
-        """
-        Initialize a RPC server.
-
-        :param port: the port in which to listen for incoming messages
-        :type port: int
-        :param host: the address to bind to
-        :type host: str
-        """
-        SocketRpcServer.__init__(self, port, host)
-        self._server = None
-
-    def run(self):
-        """
-        Run the server.
-        """
-        logger.info('Running server on port %d.' % self.port)
-        # parent implementation does not hold the server instance, so we do it
-        # here.
-        self._server = ThreadedTCPServer((self.host, self.port),
-                                         SocketHandler, self)
-        # if we chose to use a random port, fetch the port number info.
-        if self.port is 0:
-            self.port = self._server.socket.getsockname()[1]
-        self._server.serve_forever()
-
-    def stop(self):
-        """
-        Stop the server.
-        """
-        self._server.shutdown()
-
-
-class EventsSingletonDaemon(threading.Thread):
-    """
-    Singleton class for for launching and terminating a daemon.
-
-    This class is used so every part of the mechanism that needs to listen for
-    messages can launch its own daemon (thread) to do the job.
-    """
-
-    # Singleton instance
-    __instance = None
-
-    def __new__(cls, *args, **kwargs):
-        """
-        Return a singleton instance if it exists or create and initialize one.
-        """
-        if len(args) is not 2:
-            raise TypeError("__init__() takes exactly 2 arguments (%d given)"
-                            % len(args))
-        if cls.__instance is None:
-            cls.__instance = object.__new__(
-                EventsSingletonDaemon)
-            cls.__initialize(cls.__instance, args[0], args[1])
-        return cls.__instance
-
-    @staticmethod
-    def __initialize(self, port, service):
-        """
-        Initialize a singleton daemon.
-
-        This is a static method disguised as instance method that actually
-        does the initialization of the daemon instance.
-
-        :param port: the port in which to listen for incoming messages
-        :type port: int
-        :param service: the service to provide in this daemon
-        :type service: google.protobuf.service.Service
-        """
-        threading.Thread.__init__(self)
-        self._port = port
-        self._service = service
-        self._server = EventsRpcServer(self._port)
-        self._server.registerService(self._service)
-        self.daemon = True
-
-    def __init__(self):
-        """
-        Singleton placeholder initialization method.
-
-        Initialization is made in __new__ so we can always return the same
-        instance upon object creation.
-        """
-        pass
-
-    @classmethod
-    def ensure(cls, port):
-        """
-        Make sure the daemon instance is running.
-
-        Each implementation of this method should call `self.ensure_service`
-        with the appropriate service from the `events.proto` definitions, and
-        return the daemon instance.
-
-        :param port: the port in which the daemon should be listening
-        :type port: int
-
-        :return: a daemon instance
-        :rtype: EventsSingletonDaemon
-        """
-        raise NotImplementedError(self.ensure)
-
-    @classmethod
-    def ensure_service(cls, port, service):
-        """
-        Start the singleton instance if not already running.
-
-        Might return ServiceAlreadyRunningException
-
-        :param port: the port in which the daemon should be listening
-        :type port: int
-
-        :return: a daemon instance
-        :rtype: EventsSingletonDaemon
-        """
-        daemon = cls(port, service)
-        if not daemon.is_alive():
-            daemon.start()
-        elif port and port != cls.__instance._port:
-            # service is running in this process but someone is trying to
-            # start it in another port
-            raise ServiceAlreadyRunningException(
-                "Service is already running in this process on port %d."
-                % self.__instance._port)
-        return daemon
-
-    @classmethod
-    def get_instance(cls):
-        """
-        Retrieve singleton instance of this daemon.
-
-        :return: a daemon instance
-        :rtype: EventsSingletonDaemon
-        """
-        return cls.__instance
-
-    def run(self):
-        """
-        Run the server.
-        """
-        self._server.run()
-
-    def stop(self):
-        """
-        Stop the daemon.
-        """
-        self._server.stop()
-
-    def get_port(self):
-        """
-        Retrieve the value of the port to which the service running in this
-        daemon is binded to.
-
-        :return: the port to which the daemon is binded to
-        :rtype: int
-        """
-        if self._port is 0:
-            self._port = self._server.port
-        return self._port
similarity index 73%
rename from src/leap/common/events/mac_auth.py
rename to src/leap/common/events/errors.py
index 49d48f7..58e0014 100644 (file)
@@ -1,6 +1,6 @@
 # -*- coding: utf-8 -*-
-# mac_auth.py
-# Copyright (C) 2013 LEAP
+# errors.py
+# Copyright (C) 2015 LEAP
 #
 # This program is free software: you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 # You should have received a copy of the GNU General Public License
 # along with this program. If not, see <http://www.gnu.org/licenses/>.
 
-"""
-Authentication system for events.
 
-This is not implemented yet.
-"""
-
-
-class MacMethod(object):
+class CallbackAlreadyRegisteredError(Exception):
     """
-    Representation of possible MAC authentication methods.
+    Raised when trying to register an already registered callback.
     """
-
-    MAC_NONE = 'none'
-    MAC_HMAC = 'hmac'
+    pass
diff --git a/src/leap/common/events/events.proto b/src/leap/common/events/events.proto
deleted file mode 100644 (file)
index 2371b2a..0000000
+++ /dev/null
@@ -1,145 +0,0 @@
-// signal.proto
-// Copyright (C) 2013 LEAP
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-//
-// You should have received a copy of the GNU General Public License
-// along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-package leap.common.events;
-option py_generic_services = true;
-
-
-// These are the events that can be signaled using the events mechanism.
-
-enum Event {
-  CLIENT_SESSION_ID = 1;
-  CLIENT_UID = 2;
-  SOLEDAD_CREATING_KEYS = 3;
-  SOLEDAD_DONE_CREATING_KEYS = 4;
-  SOLEDAD_UPLOADING_KEYS = 5;
-  SOLEDAD_DONE_UPLOADING_KEYS = 6;
-  SOLEDAD_DOWNLOADING_KEYS = 7;
-  SOLEDAD_DONE_DOWNLOADING_KEYS = 8;
-  SOLEDAD_NEW_DATA_TO_SYNC = 9;
-  SOLEDAD_DONE_DATA_SYNC = 10;
-  UPDATER_NEW_UPDATES = 11;
-  UPDATER_DONE_UPDATING = 12;
-  RAISE_WINDOW = 13;
-  SMTP_SERVICE_STARTED = 14;
-  SMTP_SERVICE_FAILED_TO_START = 15;
-  SMTP_RECIPIENT_ACCEPTED_ENCRYPTED = 16;
-  SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED = 17;
-  SMTP_RECIPIENT_REJECTED = 18;
-  SMTP_START_ENCRYPT_AND_SIGN = 19;
-  SMTP_END_ENCRYPT_AND_SIGN = 20;
-  SMTP_START_SIGN = 21;
-  SMTP_END_SIGN = 22;
-  SMTP_SEND_MESSAGE_START = 23;
-  SMTP_SEND_MESSAGE_SUCCESS = 24;
-  SMTP_SEND_MESSAGE_ERROR = 25;
-  SMTP_CONNECTION_LOST = 26;
-  IMAP_SERVICE_STARTED = 30;
-  IMAP_SERVICE_FAILED_TO_START = 31;
-  IMAP_CLIENT_LOGIN = 32;
-  IMAP_FETCHED_INCOMING = 33;
-  IMAP_MSG_PROCESSING = 34;
-  IMAP_MSG_DECRYPTED = 35;
-  IMAP_MSG_SAVED_LOCALLY = 36;
-  IMAP_MSG_DELETED_INCOMING = 37;
-  IMAP_UNHANDLED_ERROR = 38;
-  IMAP_UNREAD_MAIL = 39;
-  KEYMANAGER_LOOKING_FOR_KEY = 40;
-  KEYMANAGER_KEY_FOUND = 41;
-  KEYMANAGER_KEY_NOT_FOUND = 42;
-  KEYMANAGER_STARTED_KEY_GENERATION = 43;
-  KEYMANAGER_FINISHED_KEY_GENERATION = 44;
-  KEYMANAGER_DONE_UPLOADING_KEYS = 45;
-  SOLEDAD_INVALID_AUTH_TOKEN = 46;
-  SOLEDAD_SYNC_SEND_STATUS = 47;
-  SOLEDAD_SYNC_RECEIVE_STATUS = 48;
-}
-
-
-// A SignalRequest is the type of the message sent from one component to request
-// that a signal be sent to every registered component.
-
-message SignalRequest {
-  required Event event = 1;
-  required string content = 2;
-  required string mac_method = 3;
-  required bytes mac = 4;
-  optional string enc_method = 5;
-  optional bool error_occurred = 6;
-}
-
-
-// A RegisterRequest message tells the server that a component wants to
-// be signaled whenever a specific event occurs.
-
-message RegisterRequest {
-  required Event event = 1;
-  required int32 port = 2;
-  required string mac_method = 3;
-  required bytes mac = 4;
-}
-
-
-// An UnregisterRequest message tells the server that a component does not
-// want to be signaled when a specific event occurs.
-
-message UnregisterRequest {
-  required Event event = 1;
-  required int32 port = 2;
-  required string mac_method = 3;
-  required bytes mac = 4;
-}
-
-
-// A PingRequest message is used to find out if a server or component is
-// alive.
-
-message PingRequest {
-}
-
-
-// The EventResponse is the message sent back by server and components after
-// they receive other kinds of requests.
-
-message EventResponse {
-
-  enum Status {
-    OK = 1;
-    UNAUTH = 2;
-    ERROR = 3;
-  }
-
-  required Status status = 1;
-  optional string result = 2;
-}
-
-
-// The EventsServerService is the service provided by the server.
-
-service EventsServerService {
-  rpc ping(PingRequest) returns (EventResponse);
-  rpc register(RegisterRequest) returns (EventResponse);
-  rpc unregister(UnregisterRequest) returns (EventResponse);
-  rpc signal(SignalRequest) returns (EventResponse);
-}
-
-
-// EventsComponentService is the service provided by components (clients).
-
-service EventsClientService {
-  rpc ping(PingRequest) returns (EventResponse);
-  rpc signal(SignalRequest) returns (EventResponse);
-}
diff --git a/src/leap/common/events/events_pb2.py b/src/leap/common/events/events_pb2.py
deleted file mode 100644 (file)
index 9692ea1..0000000
+++ /dev/null
@@ -1,635 +0,0 @@
-# Generated by the protocol buffer compiler.  DO NOT EDIT!
-
-from google.protobuf import descriptor
-from google.protobuf import message
-from google.protobuf import reflection
-from google.protobuf import service
-from google.protobuf import service_reflection
-from google.protobuf import descriptor_pb2
-# @@protoc_insertion_point(imports)
-
-
-
-DESCRIPTOR = descriptor.FileDescriptor(
-  name='events.proto',
-  package='leap.common.events',
-  serialized_pb='\n\x0c\x65vents.proto\x12\x12leap.common.events\"\x97\x01\n\rSignalRequest\x12(\n\x05\x65vent\x18\x01 \x02(\x0e\x32\x19.leap.common.events.Event\x12\x0f\n\x07\x63ontent\x18\x02 \x02(\t\x12\x12\n\nmac_method\x18\x03 \x02(\t\x12\x0b\n\x03mac\x18\x04 \x02(\x0c\x12\x12\n\nenc_method\x18\x05 \x01(\t\x12\x16\n\x0e\x65rror_occurred\x18\x06 \x01(\x08\"j\n\x0fRegisterRequest\x12(\n\x05\x65vent\x18\x01 \x02(\x0e\x32\x19.leap.common.events.Event\x12\x0c\n\x04port\x18\x02 \x02(\x05\x12\x12\n\nmac_method\x18\x03 \x02(\t\x12\x0b\n\x03mac\x18\x04 \x02(\x0c\"l\n\x11UnregisterRequest\x12(\n\x05\x65vent\x18\x01 \x02(\x0e\x32\x19.leap.common.events.Event\x12\x0c\n\x04port\x18\x02 \x02(\x05\x12\x12\n\nmac_method\x18\x03 \x02(\t\x12\x0b\n\x03mac\x18\x04 \x02(\x0c\"\r\n\x0bPingRequest\"\x82\x01\n\rEventResponse\x12\x38\n\x06status\x18\x01 \x02(\x0e\x32(.leap.common.events.EventResponse.Status\x12\x0e\n\x06result\x18\x02 \x01(\t\"\'\n\x06Status\x12\x06\n\x02OK\x10\x01\x12\n\n\x06UNAUTH\x10\x02\x12\t\n\x05\x45RROR\x10\x03*\x9f\n\n\x05\x45vent\x12\x15\n\x11\x43LIENT_SESSION_ID\x10\x01\x12\x0e\n\nCLIENT_UID\x10\x02\x12\x19\n\x15SOLEDAD_CREATING_KEYS\x10\x03\x12\x1e\n\x1aSOLEDAD_DONE_CREATING_KEYS\x10\x04\x12\x1a\n\x16SOLEDAD_UPLOADING_KEYS\x10\x05\x12\x1f\n\x1bSOLEDAD_DONE_UPLOADING_KEYS\x10\x06\x12\x1c\n\x18SOLEDAD_DOWNLOADING_KEYS\x10\x07\x12!\n\x1dSOLEDAD_DONE_DOWNLOADING_KEYS\x10\x08\x12\x1c\n\x18SOLEDAD_NEW_DATA_TO_SYNC\x10\t\x12\x1a\n\x16SOLEDAD_DONE_DATA_SYNC\x10\n\x12\x17\n\x13UPDATER_NEW_UPDATES\x10\x0b\x12\x19\n\x15UPDATER_DONE_UPDATING\x10\x0c\x12\x10\n\x0cRAISE_WINDOW\x10\r\x12\x18\n\x14SMTP_SERVICE_STARTED\x10\x0e\x12 \n\x1cSMTP_SERVICE_FAILED_TO_START\x10\x0f\x12%\n!SMTP_RECIPIENT_ACCEPTED_ENCRYPTED\x10\x10\x12\'\n#SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED\x10\x11\x12\x1b\n\x17SMTP_RECIPIENT_REJECTED\x10\x12\x12\x1f\n\x1bSMTP_START_ENCRYPT_AND_SIGN\x10\x13\x12\x1d\n\x19SMTP_END_ENCRYPT_AND_SIGN\x10\x14\x12\x13\n\x0fSMTP_START_SIGN\x10\x15\x12\x11\n\rSMTP_END_SIGN\x10\x16\x12\x1b\n\x17SMTP_SEND_MESSAGE_START\x10\x17\x12\x1d\n\x19SMTP_SEND_MESSAGE_SUCCESS\x10\x18\x12\x1b\n\x17SMTP_SEND_MESSAGE_ERROR\x10\x19\x12\x18\n\x14SMTP_CONNECTION_LOST\x10\x1a\x12\x18\n\x14IMAP_SERVICE_STARTED\x10\x1e\x12 \n\x1cIMAP_SERVICE_FAILED_TO_START\x10\x1f\x12\x15\n\x11IMAP_CLIENT_LOGIN\x10 \x12\x19\n\x15IMAP_FETCHED_INCOMING\x10!\x12\x17\n\x13IMAP_MSG_PROCESSING\x10\"\x12\x16\n\x12IMAP_MSG_DECRYPTED\x10#\x12\x1a\n\x16IMAP_MSG_SAVED_LOCALLY\x10$\x12\x1d\n\x19IMAP_MSG_DELETED_INCOMING\x10%\x12\x18\n\x14IMAP_UNHANDLED_ERROR\x10&\x12\x14\n\x10IMAP_UNREAD_MAIL\x10\'\x12\x1e\n\x1aKEYMANAGER_LOOKING_FOR_KEY\x10(\x12\x18\n\x14KEYMANAGER_KEY_FOUND\x10)\x12\x1c\n\x18KEYMANAGER_KEY_NOT_FOUND\x10*\x12%\n!KEYMANAGER_STARTED_KEY_GENERATION\x10+\x12&\n\"KEYMANAGER_FINISHED_KEY_GENERATION\x10,\x12\"\n\x1eKEYMANAGER_DONE_UPLOADING_KEYS\x10-\x12\x1e\n\x1aSOLEDAD_INVALID_AUTH_TOKEN\x10.\x12\x1c\n\x18SOLEDAD_SYNC_SEND_STATUS\x10/\x12\x1f\n\x1bSOLEDAD_SYNC_RECEIVE_STATUS\x10\x30\x32\xdd\x02\n\x13\x45ventsServerService\x12J\n\x04ping\x12\x1f.leap.common.events.PingRequest\x1a!.leap.common.events.EventResponse\x12R\n\x08register\x12#.leap.common.events.RegisterRequest\x1a!.leap.common.events.EventResponse\x12V\n\nunregister\x12%.leap.common.events.UnregisterRequest\x1a!.leap.common.events.EventResponse\x12N\n\x06signal\x12!.leap.common.events.SignalRequest\x1a!.leap.common.events.EventResponse2\xb1\x01\n\x13\x45ventsClientService\x12J\n\x04ping\x12\x1f.leap.common.events.PingRequest\x1a!.leap.common.events.EventResponse\x12N\n\x06signal\x12!.leap.common.events.SignalRequest\x1a!.leap.common.events.EventResponseB\x03\x90\x01\x01')
-
-_EVENT = descriptor.EnumDescriptor(
-  name='Event',
-  full_name='leap.common.events.Event',
-  filename=None,
-  file=DESCRIPTOR,
-  values=[
-    descriptor.EnumValueDescriptor(
-      name='CLIENT_SESSION_ID', index=0, number=1,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='CLIENT_UID', index=1, number=2,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='SOLEDAD_CREATING_KEYS', index=2, number=3,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='SOLEDAD_DONE_CREATING_KEYS', index=3, number=4,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='SOLEDAD_UPLOADING_KEYS', index=4, number=5,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='SOLEDAD_DONE_UPLOADING_KEYS', index=5, number=6,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='SOLEDAD_DOWNLOADING_KEYS', index=6, number=7,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='SOLEDAD_DONE_DOWNLOADING_KEYS', index=7, number=8,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='SOLEDAD_NEW_DATA_TO_SYNC', index=8, number=9,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='SOLEDAD_DONE_DATA_SYNC', index=9, number=10,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='UPDATER_NEW_UPDATES', index=10, number=11,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='UPDATER_DONE_UPDATING', index=11, number=12,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='RAISE_WINDOW', index=12, number=13,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='SMTP_SERVICE_STARTED', index=13, number=14,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='SMTP_SERVICE_FAILED_TO_START', index=14, number=15,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='SMTP_RECIPIENT_ACCEPTED_ENCRYPTED', index=15, number=16,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED', index=16, number=17,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='SMTP_RECIPIENT_REJECTED', index=17, number=18,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='SMTP_START_ENCRYPT_AND_SIGN', index=18, number=19,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='SMTP_END_ENCRYPT_AND_SIGN', index=19, number=20,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='SMTP_START_SIGN', index=20, number=21,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='SMTP_END_SIGN', index=21, number=22,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='SMTP_SEND_MESSAGE_START', index=22, number=23,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='SMTP_SEND_MESSAGE_SUCCESS', index=23, number=24,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='SMTP_SEND_MESSAGE_ERROR', index=24, number=25,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='SMTP_CONNECTION_LOST', index=25, number=26,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='IMAP_SERVICE_STARTED', index=26, number=30,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='IMAP_SERVICE_FAILED_TO_START', index=27, number=31,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='IMAP_CLIENT_LOGIN', index=28, number=32,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='IMAP_FETCHED_INCOMING', index=29, number=33,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='IMAP_MSG_PROCESSING', index=30, number=34,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='IMAP_MSG_DECRYPTED', index=31, number=35,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='IMAP_MSG_SAVED_LOCALLY', index=32, number=36,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='IMAP_MSG_DELETED_INCOMING', index=33, number=37,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='IMAP_UNHANDLED_ERROR', index=34, number=38,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='IMAP_UNREAD_MAIL', index=35, number=39,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='KEYMANAGER_LOOKING_FOR_KEY', index=36, number=40,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='KEYMANAGER_KEY_FOUND', index=37, number=41,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='KEYMANAGER_KEY_NOT_FOUND', index=38, number=42,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='KEYMANAGER_STARTED_KEY_GENERATION', index=39, number=43,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='KEYMANAGER_FINISHED_KEY_GENERATION', index=40, number=44,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='KEYMANAGER_DONE_UPLOADING_KEYS', index=41, number=45,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='SOLEDAD_INVALID_AUTH_TOKEN', index=42, number=46,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='SOLEDAD_SYNC_SEND_STATUS', index=43, number=47,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='SOLEDAD_SYNC_RECEIVE_STATUS', index=44, number=48,
-      options=None,
-      type=None),
-  ],
-  containing_type=None,
-  options=None,
-  serialized_start=557,
-  serialized_end=1868,
-)
-
-
-CLIENT_SESSION_ID = 1
-CLIENT_UID = 2
-SOLEDAD_CREATING_KEYS = 3
-SOLEDAD_DONE_CREATING_KEYS = 4
-SOLEDAD_UPLOADING_KEYS = 5
-SOLEDAD_DONE_UPLOADING_KEYS = 6
-SOLEDAD_DOWNLOADING_KEYS = 7
-SOLEDAD_DONE_DOWNLOADING_KEYS = 8
-SOLEDAD_NEW_DATA_TO_SYNC = 9
-SOLEDAD_DONE_DATA_SYNC = 10
-UPDATER_NEW_UPDATES = 11
-UPDATER_DONE_UPDATING = 12
-RAISE_WINDOW = 13
-SMTP_SERVICE_STARTED = 14
-SMTP_SERVICE_FAILED_TO_START = 15
-SMTP_RECIPIENT_ACCEPTED_ENCRYPTED = 16
-SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED = 17
-SMTP_RECIPIENT_REJECTED = 18
-SMTP_START_ENCRYPT_AND_SIGN = 19
-SMTP_END_ENCRYPT_AND_SIGN = 20
-SMTP_START_SIGN = 21
-SMTP_END_SIGN = 22
-SMTP_SEND_MESSAGE_START = 23
-SMTP_SEND_MESSAGE_SUCCESS = 24
-SMTP_SEND_MESSAGE_ERROR = 25
-SMTP_CONNECTION_LOST = 26
-IMAP_SERVICE_STARTED = 30
-IMAP_SERVICE_FAILED_TO_START = 31
-IMAP_CLIENT_LOGIN = 32
-IMAP_FETCHED_INCOMING = 33
-IMAP_MSG_PROCESSING = 34
-IMAP_MSG_DECRYPTED = 35
-IMAP_MSG_SAVED_LOCALLY = 36
-IMAP_MSG_DELETED_INCOMING = 37
-IMAP_UNHANDLED_ERROR = 38
-IMAP_UNREAD_MAIL = 39
-KEYMANAGER_LOOKING_FOR_KEY = 40
-KEYMANAGER_KEY_FOUND = 41
-KEYMANAGER_KEY_NOT_FOUND = 42
-KEYMANAGER_STARTED_KEY_GENERATION = 43
-KEYMANAGER_FINISHED_KEY_GENERATION = 44
-KEYMANAGER_DONE_UPLOADING_KEYS = 45
-SOLEDAD_INVALID_AUTH_TOKEN = 46
-SOLEDAD_SYNC_SEND_STATUS = 47
-SOLEDAD_SYNC_RECEIVE_STATUS = 48
-
-
-_EVENTRESPONSE_STATUS = descriptor.EnumDescriptor(
-  name='Status',
-  full_name='leap.common.events.EventResponse.Status',
-  filename=None,
-  file=DESCRIPTOR,
-  values=[
-    descriptor.EnumValueDescriptor(
-      name='OK', index=0, number=1,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='UNAUTH', index=1, number=2,
-      options=None,
-      type=None),
-    descriptor.EnumValueDescriptor(
-      name='ERROR', index=2, number=3,
-      options=None,
-      type=None),
-  ],
-  containing_type=None,
-  options=None,
-  serialized_start=515,
-  serialized_end=554,
-)
-
-
-_SIGNALREQUEST = descriptor.Descriptor(
-  name='SignalRequest',
-  full_name='leap.common.events.SignalRequest',
-  filename=None,
-  file=DESCRIPTOR,
-  containing_type=None,
-  fields=[
-    descriptor.FieldDescriptor(
-      name='event', full_name='leap.common.events.SignalRequest.event', index=0,
-      number=1, type=14, cpp_type=8, label=2,
-      has_default_value=False, default_value=1,
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None),
-    descriptor.FieldDescriptor(
-      name='content', full_name='leap.common.events.SignalRequest.content', index=1,
-      number=2, type=9, cpp_type=9, label=2,
-      has_default_value=False, default_value=unicode("", "utf-8"),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None),
-    descriptor.FieldDescriptor(
-      name='mac_method', full_name='leap.common.events.SignalRequest.mac_method', index=2,
-      number=3, type=9, cpp_type=9, label=2,
-      has_default_value=False, default_value=unicode("", "utf-8"),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None),
-    descriptor.FieldDescriptor(
-      name='mac', full_name='leap.common.events.SignalRequest.mac', index=3,
-      number=4, type=12, cpp_type=9, label=2,
-      has_default_value=False, default_value="",
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None),
-    descriptor.FieldDescriptor(
-      name='enc_method', full_name='leap.common.events.SignalRequest.enc_method', index=4,
-      number=5, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=unicode("", "utf-8"),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None),
-    descriptor.FieldDescriptor(
-      name='error_occurred', full_name='leap.common.events.SignalRequest.error_occurred', index=5,
-      number=6, type=8, cpp_type=7, label=1,
-      has_default_value=False, default_value=False,
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None),
-  ],
-  extensions=[
-  ],
-  nested_types=[],
-  enum_types=[
-  ],
-  options=None,
-  is_extendable=False,
-  extension_ranges=[],
-  serialized_start=37,
-  serialized_end=188,
-)
-
-
-_REGISTERREQUEST = descriptor.Descriptor(
-  name='RegisterRequest',
-  full_name='leap.common.events.RegisterRequest',
-  filename=None,
-  file=DESCRIPTOR,
-  containing_type=None,
-  fields=[
-    descriptor.FieldDescriptor(
-      name='event', full_name='leap.common.events.RegisterRequest.event', index=0,
-      number=1, type=14, cpp_type=8, label=2,
-      has_default_value=False, default_value=1,
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None),
-    descriptor.FieldDescriptor(
-      name='port', full_name='leap.common.events.RegisterRequest.port', index=1,
-      number=2, type=5, cpp_type=1, label=2,
-      has_default_value=False, default_value=0,
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None),
-    descriptor.FieldDescriptor(
-      name='mac_method', full_name='leap.common.events.RegisterRequest.mac_method', index=2,
-      number=3, type=9, cpp_type=9, label=2,
-      has_default_value=False, default_value=unicode("", "utf-8"),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None),
-    descriptor.FieldDescriptor(
-      name='mac', full_name='leap.common.events.RegisterRequest.mac', index=3,
-      number=4, type=12, cpp_type=9, label=2,
-      has_default_value=False, default_value="",
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None),
-  ],
-  extensions=[
-  ],
-  nested_types=[],
-  enum_types=[
-  ],
-  options=None,
-  is_extendable=False,
-  extension_ranges=[],
-  serialized_start=190,
-  serialized_end=296,
-)
-
-
-_UNREGISTERREQUEST = descriptor.Descriptor(
-  name='UnregisterRequest',
-  full_name='leap.common.events.UnregisterRequest',
-  filename=None,
-  file=DESCRIPTOR,
-  containing_type=None,
-  fields=[
-    descriptor.FieldDescriptor(
-      name='event', full_name='leap.common.events.UnregisterRequest.event', index=0,
-      number=1, type=14, cpp_type=8, label=2,
-      has_default_value=False, default_value=1,
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None),
-    descriptor.FieldDescriptor(
-      name='port', full_name='leap.common.events.UnregisterRequest.port', index=1,
-      number=2, type=5, cpp_type=1, label=2,
-      has_default_value=False, default_value=0,
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None),
-    descriptor.FieldDescriptor(
-      name='mac_method', full_name='leap.common.events.UnregisterRequest.mac_method', index=2,
-      number=3, type=9, cpp_type=9, label=2,
-      has_default_value=False, default_value=unicode("", "utf-8"),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None),
-    descriptor.FieldDescriptor(
-      name='mac', full_name='leap.common.events.UnregisterRequest.mac', index=3,
-      number=4, type=12, cpp_type=9, label=2,
-      has_default_value=False, default_value="",
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None),
-  ],
-  extensions=[
-  ],
-  nested_types=[],
-  enum_types=[
-  ],
-  options=None,
-  is_extendable=False,
-  extension_ranges=[],
-  serialized_start=298,
-  serialized_end=406,
-)
-
-
-_PINGREQUEST = descriptor.Descriptor(
-  name='PingRequest',
-  full_name='leap.common.events.PingRequest',
-  filename=None,
-  file=DESCRIPTOR,
-  containing_type=None,
-  fields=[
-  ],
-  extensions=[
-  ],
-  nested_types=[],
-  enum_types=[
-  ],
-  options=None,
-  is_extendable=False,
-  extension_ranges=[],
-  serialized_start=408,
-  serialized_end=421,
-)
-
-
-_EVENTRESPONSE = descriptor.Descriptor(
-  name='EventResponse',
-  full_name='leap.common.events.EventResponse',
-  filename=None,
-  file=DESCRIPTOR,
-  containing_type=None,
-  fields=[
-    descriptor.FieldDescriptor(
-      name='status', full_name='leap.common.events.EventResponse.status', index=0,
-      number=1, type=14, cpp_type=8, label=2,
-      has_default_value=False, default_value=1,
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None),
-    descriptor.FieldDescriptor(
-      name='result', full_name='leap.common.events.EventResponse.result', index=1,
-      number=2, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=unicode("", "utf-8"),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None),
-  ],
-  extensions=[
-  ],
-  nested_types=[],
-  enum_types=[
-    _EVENTRESPONSE_STATUS,
-  ],
-  options=None,
-  is_extendable=False,
-  extension_ranges=[],
-  serialized_start=424,
-  serialized_end=554,
-)
-
-_SIGNALREQUEST.fields_by_name['event'].enum_type = _EVENT
-_REGISTERREQUEST.fields_by_name['event'].enum_type = _EVENT
-_UNREGISTERREQUEST.fields_by_name['event'].enum_type = _EVENT
-_EVENTRESPONSE.fields_by_name['status'].enum_type = _EVENTRESPONSE_STATUS
-_EVENTRESPONSE_STATUS.containing_type = _EVENTRESPONSE;
-DESCRIPTOR.message_types_by_name['SignalRequest'] = _SIGNALREQUEST
-DESCRIPTOR.message_types_by_name['RegisterRequest'] = _REGISTERREQUEST
-DESCRIPTOR.message_types_by_name['UnregisterRequest'] = _UNREGISTERREQUEST
-DESCRIPTOR.message_types_by_name['PingRequest'] = _PINGREQUEST
-DESCRIPTOR.message_types_by_name['EventResponse'] = _EVENTRESPONSE
-
-class SignalRequest(message.Message):
-  __metaclass__ = reflection.GeneratedProtocolMessageType
-  DESCRIPTOR = _SIGNALREQUEST
-  
-  # @@protoc_insertion_point(class_scope:leap.common.events.SignalRequest)
-
-class RegisterRequest(message.Message):
-  __metaclass__ = reflection.GeneratedProtocolMessageType
-  DESCRIPTOR = _REGISTERREQUEST
-  
-  # @@protoc_insertion_point(class_scope:leap.common.events.RegisterRequest)
-
-class UnregisterRequest(message.Message):
-  __metaclass__ = reflection.GeneratedProtocolMessageType
-  DESCRIPTOR = _UNREGISTERREQUEST
-  
-  # @@protoc_insertion_point(class_scope:leap.common.events.UnregisterRequest)
-
-class PingRequest(message.Message):
-  __metaclass__ = reflection.GeneratedProtocolMessageType
-  DESCRIPTOR = _PINGREQUEST
-  
-  # @@protoc_insertion_point(class_scope:leap.common.events.PingRequest)
-
-class EventResponse(message.Message):
-  __metaclass__ = reflection.GeneratedProtocolMessageType
-  DESCRIPTOR = _EVENTRESPONSE
-  
-  # @@protoc_insertion_point(class_scope:leap.common.events.EventResponse)
-
-
-_EVENTSSERVERSERVICE = descriptor.ServiceDescriptor(
-  name='EventsServerService',
-  full_name='leap.common.events.EventsServerService',
-  file=DESCRIPTOR,
-  index=0,
-  options=None,
-  serialized_start=1871,
-  serialized_end=2220,
-  methods=[
-  descriptor.MethodDescriptor(
-    name='ping',
-    full_name='leap.common.events.EventsServerService.ping',
-    index=0,
-    containing_service=None,
-    input_type=_PINGREQUEST,
-    output_type=_EVENTRESPONSE,
-    options=None,
-  ),
-  descriptor.MethodDescriptor(
-    name='register',
-    full_name='leap.common.events.EventsServerService.register',
-    index=1,
-    containing_service=None,
-    input_type=_REGISTERREQUEST,
-    output_type=_EVENTRESPONSE,
-    options=None,
-  ),
-  descriptor.MethodDescriptor(
-    name='unregister',
-    full_name='leap.common.events.EventsServerService.unregister',
-    index=2,
-    containing_service=None,
-    input_type=_UNREGISTERREQUEST,
-    output_type=_EVENTRESPONSE,
-    options=None,
-  ),
-  descriptor.MethodDescriptor(
-    name='signal',
-    full_name='leap.common.events.EventsServerService.signal',
-    index=3,
-    containing_service=None,
-    input_type=_SIGNALREQUEST,
-    output_type=_EVENTRESPONSE,
-    options=None,
-  ),
-])
-
-class EventsServerService(service.Service):
-  __metaclass__ = service_reflection.GeneratedServiceType
-  DESCRIPTOR = _EVENTSSERVERSERVICE
-class EventsServerService_Stub(EventsServerService):
-  __metaclass__ = service_reflection.GeneratedServiceStubType
-  DESCRIPTOR = _EVENTSSERVERSERVICE
-
-
-_EVENTSCLIENTSERVICE = descriptor.ServiceDescriptor(
-  name='EventsClientService',
-  full_name='leap.common.events.EventsClientService',
-  file=DESCRIPTOR,
-  index=1,
-  options=None,
-  serialized_start=2223,
-  serialized_end=2400,
-  methods=[
-  descriptor.MethodDescriptor(
-    name='ping',
-    full_name='leap.common.events.EventsClientService.ping',
-    index=0,
-    containing_service=None,
-    input_type=_PINGREQUEST,
-    output_type=_EVENTRESPONSE,
-    options=None,
-  ),
-  descriptor.MethodDescriptor(
-    name='signal',
-    full_name='leap.common.events.EventsClientService.signal',
-    index=1,
-    containing_service=None,
-    input_type=_SIGNALREQUEST,
-    output_type=_EVENTRESPONSE,
-    options=None,
-  ),
-])
-
-class EventsClientService(service.Service):
-  __metaclass__ = service_reflection.GeneratedServiceType
-  DESCRIPTOR = _EVENTSCLIENTSERVICE
-class EventsClientService_Stub(EventsClientService):
-  __metaclass__ = service_reflection.GeneratedServiceStubType
-  DESCRIPTOR = _EVENTSCLIENTSERVICE
-
-# @@protoc_insertion_point(module_scope)
index 41aede3..a69202e 100644 (file)
 #
 # You should have received a copy of the GNU General Public License
 # along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-A server for the events mechanism.
-
-A server can receive different kinds of requests from clients:
 
-  1. Registration request: store client port number to be notified when
-     a specific signal arrives.
 
-  2. Signal request: redistribute the signal to registered clients.
 """
-import logging
-import socket
+The server for the events mechanism.
+"""
 
 
-from protobuf.socketrpc import RpcService
-from leap.common.events import (
-    events_pb2 as proto,
-    daemon,
-)
+import logging
+import txzmq
 
+from leap.common.zmq_utils import zmq_has_curve
 
-logger = logging.getLogger(__name__)
+from leap.common.events.zmq_components import TxZmqServerComponent
 
 
-SERVER_PORT = 8090
+if zmq_has_curve():
+    EMIT_ADDR = "tcp://127.0.0.1:9000"
+    REG_ADDR = "tcp://127.0.0.1:9001"
+else:
+    EMIT_ADDR = "ipc:///tmp/leap.common.events.socket.0"
+    REG_ADDR = "ipc:///tmp/leap.common.events.socket.1"
 
-# the `registered_clients` dictionary below should have the following
-# format:
-#
-#     { event_signal: [ port, ... ], ... }
-#
-registered_clients = {}
 
-
-class PortAlreadyTaken(Exception):
-    """
-    Raised when trying to open a server in a port that is already taken.
-    """
-    pass
+logger = logging.getLogger(__name__)
 
 
-def ensure_server(port=SERVER_PORT):
+def ensure_server(emit_addr=EMIT_ADDR, reg_addr=REG_ADDR):
     """
-    Make sure the server is running on the given port.
+    Make sure the server is running in the given addresses.
 
-    Attempt to connect to given local port. Upon success, assume that the
-    events server has already been started. Upon failure, start events server.
+    :param emit_addr: The address in which to receive events from clients.
+    :type emit_addr: str
+    :param reg_addr: The address to which publish events to clients.
+    :type reg_addr: str
 
-    :param port: the port in which server should be listening
-    :type port: int
-
-    :return: the daemon instance or nothing
-    :rtype: EventsServerDaemon or None
-
-    :raise PortAlreadyTaken: Raised if C{port} is already taken by something
-                             that is not an events server.
-    """
-    try:
-        # check if port is available
-        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        s.connect(('localhost', port))
-        s.close()
-        # port is taken, check if there's a server running there
-        response = ping(port=port, timeout=1000)
-        if response is not None and response.status == proto.EventResponse.OK:
-            logger.info('A server is already running on port %d.', port)
-            return
-        # port is taken, and not by an events server
-        logger.warning(
-            'Port %d is taken by something not an events server.', port)
-        raise PortAlreadyTaken(port)
-    except socket.error:
-        # port is available, run a server
-        logger.info('Launching server on port %d.', port)
-        return EventsServerDaemon.ensure(port)
-
-
-def ping(port=SERVER_PORT, reqcbk=None, timeout=1000):
+    :return: an events server instance
+    :rtype: EventsServer
     """
-    Ping the server.
-
-    :param port: the port in which server should be listening
-    :type port: int
-    :param reqcbk: a callback to be called when a response from server is
-                   received
-    :type reqcbk: function(proto.PingRequest, proto.EventResponse)
-    :param timeout: the timeout for synch calls
-    :type timeout: int
-
-    :return: the response from server for synch calls or nothing for asynch
-             calls.
-    :rtype: leap.common.events.events_pb2.EventsResponse or None
-    """
-    request = proto.PingRequest()
-    service = RpcService(
-        proto.EventsServerService_Stub,
-        port,
-        'localhost')
-    logger.debug("Pinging server in port %d..." % port)
-    return service.ping(request, callback=reqcbk, timeout=timeout)
+    _server = EventsServer(emit_addr, reg_addr)
+    return _server
 
 
-class EventsServerService(proto.EventsServerService):
+class EventsServer(TxZmqServerComponent):
     """
-    Service for receiving events in clients.
+    An events server that listens for events in one address and publishes those
+    events in another address.
     """
 
-    def register(self, controller, request, done):
-        """
-        Register a client port to be signaled when specific events come in.
-
-        :param controller: used to mediate a single method call
-        :type controller: protobuf.socketrpc.controller.SocketRpcController
-        :param request: the request received from the client
-        :type request: leap.common.events.events_pb2.RegisterRequest
-        :param done: callback to be called when done
-        :type done: protobuf.socketrpc.server.Callback
-        """
-        logger.info("Received registration request: %s..." % str(request)[:40])
-        # add client port to signal list
-        if request.event not in registered_clients:
-            registered_clients[request.event] = set([])
-        registered_clients[request.event].add(request.port)
-        # send response back to client
-
-        logger.debug('sending response back')
-        response = proto.EventResponse()
-        response.status = proto.EventResponse.OK
-        done.run(response)
-
-    def unregister(self, controller, request, done):
-        """
-        Unregister a client port so it will not be signaled when specific
-        events come in.
-
-        :param controller: used to mediate a single method call
-        :type controller: protobuf.socketrpc.controller.SocketRpcController
-        :param request: the request received from the client
-        :type request: leap.common.events.events_pb2.RegisterRequest
-        :param done: callback to be called when done
-        :type done: protobuf.socketrpc.server.Callback
-        """
-        logger.info(
-            "Received unregistration request: %s..." % str(request)[:40])
-        # remove client port from signal list
-        response = proto.EventResponse()
-        if request.event in registered_clients:
-            try:
-                registered_clients[request.event].remove(request.port)
-                response.status = proto.EventResponse.OK
-            except KeyError:
-                response.status = proto.EventsResponse.ERROR
-                response.result = 'Port %d not registered.' % request.port
-        # send response back to client
-        logger.debug('sending response back')
-        done.run(response)
-
-    def signal(self, controller, request, done):
-        """
-        Perform an RPC call to signal all clients registered to receive a
-        specific signal.
-
-        :param controller: used to mediate a single method call
-        :type controller: protobuf.socketrpc.controller.SocketRpcController
-        :param request: the request received from the client
-        :type request: leap.common.events.events_pb2.SignalRequest
-        :param done: callback to be called when done
-        :type done: protobuf.socketrpc.server.Callback
-        """
-        logger.debug('Received signal from client: %s...', str(request)[:40])
-        # send signal to all registered clients
-        # TODO: verify signal auth
-        if request.event in registered_clients:
-            for port in registered_clients[request.event]:
-
-                def callback(req, resp):
-                    logger.debug("Signal received by " + str(port))
-
-                service = RpcService(proto.EventsClientService_Stub,
-                                     port, 'localhost')
-                service.signal(request, callback=callback)
-        # send response back to client
-        response = proto.EventResponse()
-        response.status = proto.EventResponse.OK
-        done.run(response)
-
-    def ping(self, controller, request, done):
+    def __init__(self, emit_addr, reg_addr):
         """
-        Reply to a ping request.
-
-        :param controller: used to mediate a single method call
-        :type controller: protobuf.socketrpc.controller.SocketRpcController
-        :param request: the request received from the client
-        :type request: leap.common.events.events_pb2.RegisterRequest
-        :param done: callback to be called when done
-        :type done: protobuf.socketrpc.server.Callback
-        """
-        logger.debug("Received ping request, sending response.")
-        response = proto.EventResponse()
-        response.status = proto.EventResponse.OK
-        done.run(response)
-
+        Initialize the events server.
 
-class EventsServerDaemon(daemon.EventsSingletonDaemon):
-    """
-    Singleton class for starting an events server daemon.
-    """
-
-    @classmethod
-    def ensure(cls, port):
+        :param emit_addr: The address in which to receive events from clients.
+        :type emit_addr: str
+        :param reg_addr: The address to which publish events to clients.
+        :type reg_addr: str
         """
-        Make sure the daemon is running on the given port.
-
-        :param port: the port in which the daemon should listen
-        :type port: int
+        TxZmqServerComponent.__init__(self)
+        # bind PULL and PUB sockets
+        self._pull, self.pull_port = self._zmq_bind(
+            txzmq.ZmqPullConnection, emit_addr)
+        self._pub, self.pub_port = self._zmq_bind(
+            txzmq.ZmqPubConnection, reg_addr)
+        # set a handler for arriving messages
+        self._pull.onPull = self._onPull
+
+    def _onPull(self, message):
+        """
+        Callback executed when a message is pulled from a client.
 
-        :return: a daemon instance
-        :rtype: EventsServerDaemon
+        :param message: The message sent by the client.
+        :type message: str
         """
-        return cls.ensure_service(port, EventsServerService())
+        event, content = message[0].split(b"\0", 1)
+        logger.debug("Publishing event: %s" % event)
+        self._pub.publish(content, tag=event)
diff --git a/src/leap/common/events/txclient.py b/src/leap/common/events/txclient.py
new file mode 100644 (file)
index 0000000..8206ed5
--- /dev/null
@@ -0,0 +1,185 @@
+# -*- coding: utf-8 -*-
+# txclient.py
+# Copyright (C) 2013, 2014, 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+The client end point of the events mechanism, implemented using txzmq.
+
+Clients are the communicating parties of the events mechanism. They
+communicate by sending messages to a server, which in turn redistributes
+messages to other clients.
+
+When a client registers a callback for a given event, it also tells the
+server that it wants to be notified whenever events of that type are sent by
+some other client.
+"""
+
+
+import logging
+import pickle
+
+import txzmq
+
+from leap.common.events.zmq_components import TxZmqClientComponent
+from leap.common.events.client import EventsClient
+from leap.common.events.client import configure_client
+from leap.common.events.server import EMIT_ADDR
+from leap.common.events.server import REG_ADDR
+from leap.common.events import catalog
+
+
+logger = logging.getLogger(__name__)
+
+
+__all__ = [
+    "configure_client",
+    "EventsTxClient",
+    "register",
+    "unregister",
+    "emit",
+    "shutdown",
+]
+
+
+class EventsTxClient(TxZmqClientComponent, EventsClient):
+    """
+    A twisted events client that listens for events in one address and
+    publishes those events to another address.
+    """
+
+    def __init__(self, emit_addr=EMIT_ADDR, reg_addr=REG_ADDR,
+            path_prefix=None):
+        """
+        Initialize the events server.
+        """
+        TxZmqClientComponent.__init__(self, path_prefix=path_prefix)
+        EventsClient.__init__(self, emit_addr, reg_addr)
+        # connect SUB first, otherwise we might miss some event sent from this
+        # same client
+        self._sub = self._zmq_connect(txzmq.ZmqSubConnection, reg_addr)
+        self._sub.gotMessage = self._gotMessage
+        self._push = self._zmq_connect(txzmq.ZmqPushConnection, emit_addr)
+
+    def _gotMessage(self, msg, tag):
+        """
+        Handle an incoming event.
+
+        :param msg: The incoming message.
+        :type msg: list(str)
+        """
+        event = getattr(catalog, tag)
+        content = pickle.loads(msg)
+        self._handle_event(event, content)
+
+    def _subscribe(self, tag):
+        """
+        Subscribe to a tag on the zmq SUB socket.
+
+        :param tag: The tag to be subscribed.
+        :type tag: str
+        """
+        self._sub.subscribe(tag)
+
+    def _unsubscribe(self, tag):
+        """
+        Unsubscribe from a tag on the zmq SUB socket.
+
+        :param tag: The tag to be unsubscribed.
+        :type tag: str
+        """
+        self._sub.unsubscribe(tag)
+
+    def _send(self, data):
+        """
+        Send data through PUSH socket.
+
+        :param data: The data to be sent.
+        :type event: str
+        """
+        self._push.send(data)
+
+    def shutdown(self):
+        TxZmqClientComponent.shutdown(self)
+        EventsClient.shutdown(self)
+
+
+def register(event, callback, uid=None, replace=False):
+    """
+    Register a callback to be executed when an event is received.
+
+    :param event: The event that triggers the callback.
+    :type event: str
+    :param callback: The callback to be executed.
+    :type callback: callable(event, content)
+    :param uid: The callback uid.
+    :type uid: str
+    :param replace: Wether an eventual callback with same ID should be
+                    replaced.
+    :type replace: bool
+
+    :return: The callback uid.
+    :rtype: str
+
+    :raises CallbackAlreadyRegisteredError: when there's already a callback
+            identified by the given uid and replace is False.
+    """
+    return EventsTxClient.instance().register(
+        event, callback, uid=uid, replace=replace)
+
+
+def unregister(event, uid=None):
+    """
+    Unregister callbacks for an event.
+
+    If uid is not None, then only the callback identified by the given uid is
+    removed. Otherwise, all callbacks for the event are removed.
+
+    :param event: The event that triggers the callback.
+    :type event: str
+    :param uid: The callback uid.
+    :type uid: str
+    """
+    return EventsTxClient.instance().unregister(event, uid=uid)
+
+
+def emit(event, *content):
+    """
+    Send an event.
+
+    :param event: The event to be sent.
+    :type event: str
+    :param content: The content of the event.
+    :type content: list
+    """
+    return EventsTxClient.instance().emit(event, *content)
+
+
+def shutdown():
+    """
+    Shutdown the events client.
+    """
+    EventsTxClient.instance().shutdown()
+
+
+def instance():
+    """
+    Return an instance of the events client.
+
+    :return: An instance of the events client.
+    :rtype: EventsClientThread
+    """
+    return EventsTxClient.instance()
diff --git a/src/leap/common/events/zmq_components.py b/src/leap/common/events/zmq_components.py
new file mode 100644 (file)
index 0000000..4fb95d3
--- /dev/null
@@ -0,0 +1,179 @@
+# -*- coding: utf-8 -*-
+# zmq.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+The server for the events mechanism.
+"""
+
+
+import os
+import logging
+import txzmq
+import re
+
+from abc import ABCMeta
+
+# XXX some distros don't package libsodium, so we have to be prepared for
+#     absence of zmq.auth
+try:
+    import zmq.auth
+    from zmq.auth.thread import ThreadAuthenticator
+except ImportError:
+    pass
+
+from leap.common.config import get_path_prefix
+from leap.common.zmq_utils import zmq_has_curve
+from leap.common.zmq_utils import maybe_create_and_get_certificates
+from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX
+
+
+logger = logging.getLogger(__name__)
+
+
+ADDRESS_RE = re.compile("(.+)://(.+):([0-9]+)")
+
+
+class TxZmqComponent(object):
+    """
+    A twisted-powered zmq events component.
+    """
+
+    __metaclass__ = ABCMeta
+
+    _component_type = None
+
+    def __init__(self, path_prefix=None):
+        """
+        Initialize the txzmq component.
+        """
+        self._factory = txzmq.ZmqFactory()
+        self._factory.registerForShutdown()
+        if path_prefix == None:
+            path_prefix = get_path_prefix()
+        self._config_prefix = os.path.join(path_prefix, "leap", "events")
+        self._connections = []
+
+    @property
+    def component_type(self):
+        if not self._component_type:
+            raise Exception(
+                "Make sure implementations of TxZmqComponent"
+                "define a self._component_type!")
+        return self._component_type
+
+    def _zmq_connect(self, connClass, address):
+        """
+        Connect to an address.
+
+        :param connClass: The connection class to be used.
+        :type connClass: txzmq.ZmqConnection
+        :param address: The address to connect to.
+        :type address: str
+
+        :return: The binded connection.
+        :rtype: txzmq.ZmqConnection
+        """
+        connection = connClass(self._factory)
+        # create and configure socket
+        socket = connection.socket
+        if zmq_has_curve():
+            public, secret = maybe_create_and_get_certificates(
+                self._config_prefix, self.component_type)
+            server_public_file = os.path.join(
+                self._config_prefix, PUBLIC_KEYS_PREFIX, "server.key")
+            server_public, _ = zmq.auth.load_certificate(server_public_file)
+            socket.curve_publickey = public
+            socket.curve_secretkey = secret
+            socket.curve_serverkey = server_public
+        socket.connect(address)
+        logger.debug("Connected %s to %s." % (connClass, address))
+        self._connections.append(connection)
+        return connection
+
+    def _zmq_bind(self, connClass, address):
+        """
+        Bind to an address.
+
+        :param connClass: The connection class to be used.
+        :type connClass: txzmq.ZmqConnection
+        :param address: The address to bind to.
+        :type address: str
+
+        :return: The binded connection and port.
+        :rtype: (txzmq.ZmqConnection, int)
+        """
+        connection = connClass(self._factory)
+        socket = connection.socket
+        if zmq_has_curve():
+            public, secret = maybe_create_and_get_certificates(
+                self._config_prefix, self.component_type)
+            socket.curve_publickey = public
+            socket.curve_secretkey = secret
+            self._start_thread_auth(connection.socket)
+        # check if port was given
+        protocol, addr, port = ADDRESS_RE.match(address).groups()
+        if port == "0":
+            port = socket.bind_to_random_port("%s://%s" % (protocol, addr))
+        else:
+            socket.bind(address)
+            port = int(port)
+        logger.debug("Binded %s to %s://%s:%d."
+                     % (connClass, protocol, addr, port))
+        self._connections.append(connection)
+        return connection, port
+
+    def _start_thread_auth(self, socket):
+        """
+        Start the zmq curve thread authenticator.
+
+        :param socket: The socket in which to configure the authenticator.
+        :type socket: zmq.Socket
+        """
+        authenticator = ThreadAuthenticator(self._factory.context)
+        authenticator.start()
+        # XXX do not hardcode this here.
+        authenticator.allow('127.0.0.1')
+        # tell authenticator to use the certificate in a directory
+        public_keys_dir = os.path.join(self._config_prefix, PUBLIC_KEYS_PREFIX)
+        authenticator.configure_curve(domain="*", location=public_keys_dir)
+        socket.curve_server = True  # must come before bind
+
+    def shutdown(self):
+        """
+        Shutdown the component.
+        """
+        logger.debug("Shutting down component %s." % str(self))
+        for conn in self._connections:
+            conn.shutdown()
+        self._factory.shutdown()
+
+
+class TxZmqServerComponent(TxZmqComponent):
+    """
+    A txZMQ server component.
+    """
+
+    _component_type = "server"
+
+
+class TxZmqClientComponent(TxZmqComponent):
+    """
+    A txZMQ client component.
+    """
+
+    _component_type = "client"
index 0779b2e..7ef3e1b 100644 (file)
 # You should have received a copy of the GNU General Public License
 # along with this program. If not, see <http://www.gnu.org/licenses/>.
 
-import unittest
-import sets
-import time
-import socket
-import threading
-import random
-
-
-from mock import Mock
-from protobuf.socketrpc import RpcService
-from leap.common import events
-from leap.common.events import (
-    server,
-    client,
-    mac_auth,
-)
-from leap.common.events.events_pb2 import (
-    EventsServerService,
-    EventsServerService_Stub,
-    EventsClientService_Stub,
-    EventResponse,
-    SignalRequest,
-    RegisterRequest,
-    PingRequest,
-    SOLEDAD_CREATING_KEYS,
-    CLIENT_UID,
-)
 
+import os
+import logging
+import time
 
-port = 8090
+from twisted.trial import unittest
+from twisted.internet import defer
 
-received = False
+from leap.common.events import server
+from leap.common.events import client
+from leap.common.events import txclient
+from leap.common.events import catalog
+from leap.common.events.errors import CallbackAlreadyRegisteredError
 
 
-class EventsTestCase(unittest.TestCase):
+if 'DEBUG' in os.environ:
+    logging.basicConfig(level=logging.DEBUG)
 
-    @classmethod
-    def setUpClass(cls):
-        server.EventsServerDaemon.ensure(8090)
-        cls.callbacks = events.client.registered_callbacks
 
-    @classmethod
-    def tearDownClass(cls):
-        # give some time for requests to be processed.
-        time.sleep(1)
+class EventsGenericClientTestCase(object):
 
     def setUp(self):
-        super(EventsTestCase, self).setUp()
+        self._server = server.ensure_server(
+            emit_addr="tcp://127.0.0.1:0",
+            reg_addr="tcp://127.0.0.1:0")
+        self._client.configure_client(
+            emit_addr="tcp://127.0.0.1:%d" % self._server.pull_port,
+            reg_addr="tcp://127.0.0.1:%d" % self._server.pub_port)
 
     def tearDown(self):
-        #events.client.registered_callbacks = {}
-        server.registered_clients = {}
-        super(EventsTestCase, self).tearDown()
-
-    def test_service_singleton(self):
-        """
-        Ensure that there's always just one instance of the server daemon
-        running.
-        """
-        service1 = server.EventsServerDaemon.ensure(8090)
-        service2 = server.EventsServerDaemon.ensure(8090)
-        self.assertEqual(service1, service2,
-                         "Can't get singleton class for service.")
+        self._client.shutdown()
+        self._server.shutdown()
+        # wait a bit for sockets to close properly
+        time.sleep(0.1)
 
     def test_client_register(self):
         """
         Ensure clients can register callbacks.
         """
-        self.assertTrue(1 not in self.callbacks,
-                        'There should should be no callback for this signal.')
-        events.register(1, lambda x: True)
-        self.assertTrue(1 in self.callbacks,
-                        'Could not register signal in local client.')
-        events.register(2, lambda x: True)
-        self.assertTrue(1 in self.callbacks,
-                        'Could not register signal in local client.')
-        self.assertTrue(2 in self.callbacks,
-                        'Could not register signal in local client.')
+        callbacks = self._client.instance().callbacks
+        self.assertTrue(len(callbacks) == 0,
+                        'There should be no callback for this event.')
+        # register one event
+        event1 = catalog.CLIENT_UID
+        cbk1 = lambda event, _: True
+        uid1 = self._client.register(event1, cbk1)
+        # assert for correct registration
+        self.assertTrue(len(callbacks) == 1)
+        self.assertTrue(callbacks[event1][uid1] == cbk1,
+                        'Could not register event in local client.')
+        # register another event
+        event2 = catalog.CLIENT_SESSION_ID
+        cbk2 = lambda event, _: True
+        uid2 = self._client.register(event2, cbk2)
+        # assert for correct registration
+        self.assertTrue(len(callbacks) == 2)
+        self.assertTrue(callbacks[event2][uid2] == cbk2,
+                        'Could not register event in local client.')
 
     def test_register_signal_replace(self):
         """
         Make sure clients can replace already registered callbacks.
         """
-        sig = 3
-        cbk = lambda x: True
-        events.register(sig, cbk, uid=1)
-        self.assertRaises(Exception, events.register, sig, lambda x: True,
-                          uid=1)
-        events.register(sig, lambda x: True, uid=1, replace=True)
-        self.assertTrue(sig in self.callbacks, 'Could not register signal.')
-        self.assertEqual(1, len(self.callbacks[sig]),
-                         'Wrong number of registered callbacks.')
-
-    def test_signal_response_status(self):
-        """
-        Ensure there's an appropriate response from server when signaling.
-        """
-        sig = 4
-        request = SignalRequest()
-        request.event = sig
-        request.content = 'my signal contents'
-        request.mac_method = mac_auth.MacMethod.MAC_NONE
-        request.mac = ""
-        service = RpcService(EventsServerService_Stub, port, 'localhost')
-        # test synch
-        response = service.signal(request, timeout=1000)
-        self.assertEqual(EventResponse.OK, response.status,
-                         'Wrong response status.')
+        event = catalog.CLIENT_UID
+        d = defer.Deferred()
+        cbk_fail = lambda event, _: d.errback(event)
+        cbk_succeed = lambda event, _: d.callback(event)
+        self._client.register(event, cbk_fail, uid=1)
+        self._client.register(event, cbk_succeed, uid=1, replace=True)
+        self._client.emit(event, None)
+        return d
 
-    def test_signal_executes_callback(self):
-        """
-        Ensure callback is executed upon receiving signal.
+    def test_register_signal_replace_fails_when_replace_is_false(self):
         """
-        sig = CLIENT_UID
-        request = SignalRequest()
-        request.event = sig
-        request.content = 'my signal contents'
-        request.mac_method = mac_auth.MacMethod.MAC_NONE
-        request.mac = ""
-        service = RpcService(EventsServerService_Stub, port, 'localhost')
-
-        # register a callback
-        flag = Mock()
-        events.register(sig, lambda req: flag(req.event))
-        # signal
-        response = service.signal(request)
-        self.assertEqual(EventResponse.OK, response.status,
-                         'Wrong response status.')
-        time.sleep(1)  # wait for signal to arrive
-        flag.assert_called_once_with(sig)
-
-    def test_events_server_service_register(self):
+        Make sure clients trying to replace already registered callbacks fail
+        when replace=False
         """
-        Ensure the server can register clients to be signaled.
-        """
-        sig = 5
-        request = RegisterRequest()
-        request.event = sig
-        request.port = 8091
-        request.mac_method = mac_auth.MacMethod.MAC_NONE
-        request.mac = ""
-        service = RpcService(EventsServerService_Stub, port, 'localhost')
-        complist = server.registered_clients
-        self.assertEqual({}, complist,
-                         'There should be no registered_ports when '
-                         'server has just been created.')
-        response = service.register(request, timeout=1000)
-        self.assertTrue(sig in complist, "Signal not registered succesfully.")
-        self.assertTrue(8091 in complist[sig],
-                        'Failed registering client port.')
+        event = catalog.CLIENT_UID
+        self._client.register(event, lambda event, _: None, uid=1)
+        self.assertRaises(
+            CallbackAlreadyRegisteredError,
+            self._client.register,
+            event, lambda event, _: None, uid=1, replace=False)
 
-    def test_client_request_register(self):
+    def test_register_more_than_one_callback_works(self):
         """
-        Ensure clients can register themselves with server.
+        Make sure clients can replace already registered callbacks.
         """
-        sig = 6
-        complist = server.registered_clients
-        self.assertTrue(sig not in complist,
-                        'There should be no registered clients for this '
-                        'signal.')
-        events.register(sig, lambda x: True)
-        time.sleep(0.1)
-        port = client.EventsClientDaemon.get_instance().get_port()
-        self.assertTrue(sig in complist, 'Failed registering client.')
-        self.assertTrue(port in complist[sig],
-                        'Failed registering client port.')
+        event = catalog.CLIENT_UID
+        d1 = defer.Deferred()
+        cbk1 = lambda event, _: d1.callback(event)
+        d2 = defer.Deferred()
+        cbk2 = lambda event, _: d2.callback(event)
+        self._client.register(event, cbk1)
+        self._client.register(event, cbk2)
+        self._client.emit(event, None)
+        d = defer.gatherResults([d1, d2])
+        return d
 
     def test_client_receives_signal(self):
         """
         Ensure clients can receive signals.
         """
-        sig = 7
-        flag = Mock()
-
-        events.register(sig, lambda req: flag(req.event))
-        request = SignalRequest()
-        request.event = sig
-        request.content = ""
-        request.mac_method = mac_auth.MacMethod.MAC_NONE
-        request.mac = ""
-        service = RpcService(EventsServerService_Stub, port, 'localhost')
-        response = service.signal(request, timeout=1000)
-        self.assertTrue(response is not None, 'Did not receive response.')
-        time.sleep(0.5)
-        flag.assert_called_once_with(sig)
-
-    def test_client_send_signal(self):
-        """
-        Ensure clients can send signals.
-        """
-        sig = 8
-        response = events.signal(sig)
-        self.assertTrue(response.status == response.OK,
-                        'Received wrong response status when signaling.')
+        event = catalog.CLIENT_UID
+        d = defer.Deferred()
+        def cbk(events, _):
+            d.callback(event)
+        self._client.register(event, cbk)
+        self._client.emit(event, None)
+        return d
 
     def test_client_unregister_all(self):
         """
         Test that the client can unregister all events for one signal.
         """
-        sig = CLIENT_UID
-        complist = server.registered_clients
-        events.register(sig, lambda x: True)
-        events.register(sig, lambda x: True)
-        time.sleep(0.1)
-        events.unregister(sig)
-        time.sleep(0.1)
-        port = client.EventsClientDaemon.get_instance().get_port()
-        self.assertFalse(bool(complist[sig]))
-        self.assertTrue(port not in complist[sig])
+        event1 = catalog.CLIENT_UID
+        d = defer.Deferred()
+        # register more than one callback for the same event
+        self._client.register(event1, lambda ev, _: d.errback(None))
+        self._client.register(event1, lambda ev, _: d.errback(None))
+        # unregister and emit the event
+        self._client.unregister(event1)
+        self._client.emit(event1, None)
+        # register and emit another event so the deferred can succeed
+        event2 = catalog.CLIENT_SESSION_ID
+        self._client.register(event2, lambda ev, _: d.callback(None))
+        self._client.emit(event2, None)
+        return d
 
     def test_client_unregister_by_uid(self):
         """
         Test that the client can unregister an event by uid.
         """
-        sig = CLIENT_UID
-        complist = server.registered_clients
-        events.register(sig, lambda x: True, uid='cbkuid')
-        events.register(sig, lambda x: True, uid='cbkuid2')
-        time.sleep(0.1)
-        events.unregister(sig, uid='cbkuid')
-        time.sleep(0.1)
-        port = client.EventsClientDaemon.get_instance().get_port()
-        self.assertTrue(sig in complist)
-        self.assertTrue(len(complist[sig]) == 1)
-        self.assertTrue(
-            client.registered_callbacks[sig].pop()[0] == 'cbkuid2')
-        self.assertTrue(port in complist[sig])
-
-    def test_server_replies_ping(self):
-        """
-        Ensure server replies to a ping.
-        """
-        request = PingRequest()
-        service = RpcService(EventsServerService_Stub, port, 'localhost')
-        response = service.ping(request, timeout=1000)
-        self.assertIsNotNone(response)
-        self.assertEqual(EventResponse.OK, response.status,
-                         'Wrong response status.')
-
-    def test_client_replies_ping(self):
-        """
-        Ensure clients reply to a ping.
-        """
-        daemon = client.ensure_client_daemon()
-        port = daemon.get_port()
-        request = PingRequest()
-        service = RpcService(EventsClientService_Stub, port, 'localhost')
-        response = service.ping(request, timeout=1000)
-        self.assertEqual(EventResponse.OK, response.status,
-                         'Wrong response status.')
-
-    def test_server_ping(self):
-        """
-        Ensure the function from server module pings correctly.
-        """
-        response = server.ping()
-        self.assertIsNotNone(response)
-        self.assertEqual(EventResponse.OK, response.status,
-                         'Wrong response status.')
-
-    def test_client_ping(self):
-        """
-        Ensure the function from client module pings correctly.
-        """
-        daemon = client.ensure_client_daemon()
-        response = client.ping(daemon.get_port())
-        self.assertIsNotNone(response)
-        self.assertEqual(EventResponse.OK, response.status,
-                         'Wrong response status.')
-
-    def test_module_ping_server(self):
-        """
-        Ensure the function from main module pings server correctly.
-        """
-        response = events.ping_server()
-        self.assertIsNotNone(response)
-        self.assertEqual(EventResponse.OK, response.status,
-                         'Wrong response status.')
-
-    def test_module_ping_client(self):
-        """
-        Ensure the function from main module pings clients correctly.
-        """
-        daemon = client.ensure_client_daemon()
-        response = events.ping_client(daemon.get_port())
-        self.assertIsNotNone(response)
-        self.assertEqual(EventResponse.OK, response.status,
-                         'Wrong response status.')
-
-    def test_ensure_server_raises_if_port_taken(self):
-        """
-        Verify that server raises an exception if port is already taken.
-        """
-        # get a random free port
-        while True:
-            port = random.randint(1024, 65535)
-            try:
-                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-                s.connect(('localhost', port))
-                s.close()
-            except:
-                break
-
-        class PortBlocker(threading.Thread):
-
-            def run(self):
-                conns = 0
-                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-                s.bind(('localhost', port))
-                s.setblocking(1)
-                s.listen(1)
-                while conns < 2:  # blocks until rece
-                    conns += 1
-                    s.accept()
-                s.close()
-
-        # block the port
-        taker = PortBlocker()
-        taker.start()
-        time.sleep(1)  # wait for thread to start.
-        self.assertRaises(
-            server.PortAlreadyTaken, server.ensure_server, port)
-
-    def test_async_register(self):
-        """
-        Test asynchronous registering of callbacks.
-        """
-        flag = Mock()
+        event = catalog.CLIENT_UID
+        d = defer.Deferred()
+        # register one callback that would fail
+        uid = self._client.register(event, lambda ev, _: d.errback(None))
+        # register one callback that will succeed
+        self._client.register(event, lambda ev, _: d.callback(None))
+        # unregister by uid and emit the event
+        self._client.unregister(event, uid=uid)
+        self._client.emit(event, None)
+        return d
 
-        # executed after async register, when response is received from server
-        def reqcbk(request, response):
-            flag(request.event, response.status)
 
-        # callback registered by application
-        def callback(request):
-            pass
+class EventsTxClientTestCase(EventsGenericClientTestCase, unittest.TestCase):
 
-        # passing a callback as reqcbk param makes the call asynchronous
-        result = events.register(CLIENT_UID, callback, reqcbk=reqcbk)
-        self.assertIsNone(result)
-        events.signal(CLIENT_UID)
-        time.sleep(1)  # wait for signal to arrive from server
-        flag.assert_called_once_with(CLIENT_UID, EventResponse.OK)
+    _client = txclient
 
-    def test_async_signal(self):
-        """
-        Test asynchronous registering of callbacks.
-        """
-        flag = Mock()
-
-        # executed after async signal, when response is received from server
-        def reqcbk(request, response):
-            flag(request.event, response.status)
-
-        # passing a callback as reqcbk param makes the call asynchronous
-        result = events.signal(CLIENT_UID, reqcbk=reqcbk)
-        self.assertIsNone(result)
-        time.sleep(1)  # wait for signal to arrive from server
-        flag.assert_called_once_with(CLIENT_UID, EventResponse.OK)
-
-    def test_async_unregister(self):
-        """
-        Test asynchronous unregistering of callbacks.
-        """
-        flag = Mock()
-
-        # executed after async signal, when response is received from server
-        def reqcbk(request, response):
-            flag(request.event, response.status)
-
-        # callback registered by application
-        def callback(request):
-            pass
-
-        # passing a callback as reqcbk param makes the call asynchronous
-        events.register(CLIENT_UID, callback)
-        result = events.unregister(CLIENT_UID, reqcbk=reqcbk)
-        self.assertIsNone(result)
-        time.sleep(1)  # wait for signal to arrive from server
-        flag.assert_called_once_with(CLIENT_UID, EventResponse.OK)
-
-    def test_async_ping_server(self):
-        """
-        Test asynchronous pinging of server.
-        """
-        flag = Mock()
-
-        # executed after async signal, when response is received from server
-        def reqcbk(request, response):
-            flag(response.status)
-
-        result = events.ping_server(reqcbk=reqcbk)
-        self.assertIsNone(result)
-        time.sleep(1)  # wait for response to arrive from server.
-        flag.assert_called_once_with(EventResponse.OK)
-
-    def test_async_ping_client(self):
-        """
-        Test asynchronous pinging of client.
-        """
-        flag = Mock()
 
-        # executed after async signal, when response is received from server
-        def reqcbk(request, response):
-            flag(response.status)
+class EventsClientTestCase(EventsGenericClientTestCase, unittest.TestCase):
 
-        daemon = client.ensure_client_daemon()
-        result = events.ping_client(daemon.get_port(), reqcbk=reqcbk)
-        self.assertIsNone(result)
-        time.sleep(1)  # wait for response to arrive from server.
-        flag.assert_called_once_with(EventResponse.OK)
+    _client = client
diff --git a/src/leap/common/zmq_utils.py b/src/leap/common/zmq_utils.py
new file mode 100644 (file)
index 0000000..19625b9
--- /dev/null
@@ -0,0 +1,105 @@
+# -*- coding: utf-8 -*-
+# zmq.py
+# Copyright (C) 2013, 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+"""
+Utilities to handle ZMQ certificates.
+"""
+import os
+import logging
+import stat
+import shutil
+
+import zmq
+
+try:
+    import zmq.auth
+except ImportError:
+    pass
+
+from leap.common.files import mkdir_p
+from leap.common.check import leap_assert
+
+logger = logging.getLogger(__name__)
+
+
+KEYS_PREFIX = "zmq_certificates"
+PUBLIC_KEYS_PREFIX = os.path.join(KEYS_PREFIX, "public_keys")
+PRIVATE_KEYS_PREFIX = os.path.join(KEYS_PREFIX, "private_keys")
+
+
+def zmq_has_curve():
+    """
+    Return whether the current ZMQ has support for auth and CurveZMQ security.
+
+    :rtype: bool
+
+     Version notes:
+       `zmq.curve_keypair()` is new in version 14.0, new in version libzmq-4.0.
+            Requires libzmq (>= 4.0) to have been linked with libsodium.
+       `zmq.auth` module is new in version 14.1
+       `zmq.has()` is new in version 14.1, new in version libzmq-4.1.
+    """
+    zmq_version = zmq.zmq_version_info()
+    pyzmq_version = zmq.pyzmq_version_info()
+
+    if pyzmq_version >= (14, 1, 0) and zmq_version >= (4, 1):
+        return zmq.has('curve')
+
+    if pyzmq_version < (14, 1, 0):
+        return False
+
+    if zmq_version < (4, 0):
+        # security is new in libzmq 4.0
+        return False
+
+    try:
+        zmq.curve_keypair()
+    except zmq.error.ZMQError:
+        # security requires libzmq to be linked against libsodium
+        return False
+
+    return True
+
+
+def assert_zmq_has_curve():
+    leap_assert(zmq_has_curve, "CurveZMQ not supported!")
+
+
+def maybe_create_and_get_certificates(basedir, name):
+    """
+    Generate the needed ZMQ certificates for backend/frontend communication if
+    needed.
+    """
+    assert_zmq_has_curve()
+    private_keys_dir = os.path.join(basedir, PRIVATE_KEYS_PREFIX)
+    private_key = os.path.join(
+        private_keys_dir, name + ".key_secret")
+    if not os.path.isfile(private_key):
+        mkdir_p(private_keys_dir)
+        zmq.auth.create_certificates(private_keys_dir, name)
+        # set permissions to: 0700 (U:rwx G:--- O:---)
+        os.chmod(private_key, stat.S_IRUSR | stat.S_IWUSR)
+        # move public key to public keys directory
+        public_keys_dir = os.path.join(basedir, PUBLIC_KEYS_PREFIX)
+        old_public_key = os.path.join(
+            private_keys_dir, name + ".key")
+        new_public_key = os.path.join(
+            public_keys_dir, name + ".key")
+        mkdir_p(public_keys_dir)
+        shutil.move(old_public_key, new_public_key)
+    return zmq.auth.load_certificate(private_key)
+
+