diff options
author | Micah Anderson <micah@riseup.net> | 2015-11-10 17:34:54 -0500 |
---|---|---|
committer | Micah Anderson <micah@riseup.net> | 2015-11-10 17:34:54 -0500 |
commit | 93ac9288e301643a4b9c31e2750a231f4e7bc8d8 (patch) | |
tree | 92d7ea2d18f288cd7dfd852ee6af8787c722c87c /src/leap/common/events | |
parent | 42814b5bf836a83724d2c74d6bb32bc168b7a81c (diff) | |
parent | e074eac10c6e08757857c770cb190cdb9d3a4583 (diff) |
Merge branch 'debian/experimental' into debian/platform-0.8
Diffstat (limited to 'src/leap/common/events')
-rw-r--r-- | src/leap/common/events/__init__.py | 33 | ||||
-rw-r--r-- | src/leap/common/events/client.py | 54 | ||||
-rw-r--r-- | src/leap/common/events/flags.py | 28 | ||||
-rw-r--r-- | src/leap/common/events/tests/__init__.py | 0 | ||||
-rw-r--r-- | src/leap/common/events/tests/test_zmq_components.py | 51 | ||||
-rw-r--r-- | src/leap/common/events/txclient.py | 19 | ||||
-rw-r--r-- | src/leap/common/events/zmq_components.py | 39 |
7 files changed, 186 insertions, 38 deletions
diff --git a/src/leap/common/events/__init__.py b/src/leap/common/events/__init__.py index 9269b9a..f9ad5fa 100644 --- a/src/leap/common/events/__init__.py +++ b/src/leap/common/events/__init__.py @@ -14,8 +14,6 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. - - """ This is an events mechanism that uses a server to allow for emitting events between clients. @@ -37,13 +35,15 @@ To emit an event, use leap.common.events.emit(): >>> from leap.common.events import catalog >>> emit(catalog.CLIENT_UID) """ - - import logging import argparse from leap.common.events import client +from leap.common.events import txclient from leap.common.events import server +from leap.common.events import flags +from leap.common.events.flags import set_events_enabled + from leap.common.events import catalog @@ -52,6 +52,7 @@ __all__ = [ "unregister", "emit", "catalog", + "set_events_enabled" ] @@ -78,7 +79,13 @@ def register(event, callback, uid=None, replace=False): :raises CallbackAlreadyRegistered: when there's already a callback identified by the given uid and replace is False. """ - return client.register(event, callback, uid, replace) + if flags.EVENTS_ENABLED: + return client.register(event, callback, uid, replace) + + +def register_async(event, callback, uid=None, replace=False): + if flags.EVENTS_ENABLED: + return txclient.register(event, callback, uid, replace) def unregister(event, uid=None): @@ -93,7 +100,13 @@ def unregister(event, uid=None): :param uid: The callback uid. :type uid: str """ - return client.unregister(event, uid) + if flags.EVENTS_ENABLED: + return client.unregister(event, uid) + + +def unregister_async(event, uid=None): + if flags.EVENTS_ENABLED: + return txclient.unregister(event, uid) def emit(event, *content): @@ -105,7 +118,13 @@ def emit(event, *content): :param content: The content of the event. :type content: list """ - return client.emit(event, *content) + if flags.EVENTS_ENABLED: + return client.emit(event, *content) + + +def emit_async(event, *content): + if flags.EVENTS_ENABLED: + return txclient.emit(event, *content) if __name__ == "__main__": diff --git a/src/leap/common/events/client.py b/src/leap/common/events/client.py index 0706fe3..60d24bc 100644 --- a/src/leap/common/events/client.py +++ b/src/leap/common/events/client.py @@ -14,8 +14,6 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. - - """ The client end point of the events mechanism. @@ -27,8 +25,6 @@ When a client registers a callback for a given event, it also tells the server that it wants to be notified whenever events of that type are sent by some other client. """ - - import logging import collections import uuid @@ -51,7 +47,7 @@ try: except ImportError: pass -from leap.common.config import get_path_prefix +from leap.common.config import flags, get_path_prefix from leap.common.zmq_utils import zmq_has_curve from leap.common.zmq_utils import maybe_create_and_get_certificates from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX @@ -173,21 +169,38 @@ class EventsClient(object): :param content: The content of the event. :type content: list """ - logger.debug("Sending event: (%s, %s)" % (event, content)) - self._send(str(event) + b'\0' + pickle.dumps(content)) + logger.debug("Emitting event: (%s, %s)" % (event, content)) + payload = str(event) + b'\0' + pickle.dumps(content) + self._send(payload) def _handle_event(self, event, content): """ Handle an incoming event. - :param msg: The incoming message. - :type msg: list(str) + :param event: The event to be sent. + :type event: Event + :param content: The content of the event. + :type content: list """ logger.debug("Handling event %s..." % event) - for uid in self._callbacks[event].keys(): + for uid in self._callbacks[event]: callback = self._callbacks[event][uid] logger.debug("Executing callback %s." % uid) - callback(event, *content) + self._run_callback(callback, event, content) + + @abstractmethod + def _run_callback(self, callback, event, content): + """ + Run a callback. + + :param callback: The callback to be run. + :type callback: callable(event, *content) + :param event: The event to be sent. + :type event: Event + :param content: The content of the event. + :type content: list + """ + pass @abstractmethod def _subscribe(self, tag): @@ -266,7 +279,7 @@ class EventsClientThread(threading.Thread, EventsClient): self._lock = threading.Lock() self._initialized = threading.Event() self._config_prefix = os.path.join( - get_path_prefix(), "leap", "events") + get_path_prefix(flags.STANDALONE), "leap", "events") self._loop = None self._context = None self._push = None @@ -368,10 +381,22 @@ class EventsClientThread(threading.Thread, EventsClient): :param data: The data to be sent. :type event: str """ - logger.debug("Sending data: %s" % data) # add send() as a callback for ioloop so it works between threads self._loop.add_callback(lambda: self._push.send(data)) + def _run_callback(self, callback, event, content): + """ + Run a callback. + + :param callback: The callback to be run. + :type callback: callable(event, *content) + :param event: The event to be sent. + :type event: Event + :param content: The content of the event. + :type content: list + """ + self._loop.add_callback(lambda: callback(event, *content)) + def register(self, event, callback, uid=None, replace=False): """ Register a callback to be executed when an event is received. @@ -393,7 +418,8 @@ class EventsClientThread(threading.Thread, EventsClient): callback identified by the given uid and replace is False. """ self.ensure_client() - return EventsClient.register(self, event, callback, uid=uid, replace=replace) + return EventsClient.register( + self, event, callback, uid=uid, replace=replace) def unregister(self, event, uid=None): """ diff --git a/src/leap/common/events/flags.py b/src/leap/common/events/flags.py new file mode 100644 index 0000000..137f663 --- /dev/null +++ b/src/leap/common/events/flags.py @@ -0,0 +1,28 @@ +# -*- coding: utf-8 -*- +# __init__.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Flags for the events framework. +""" +from leap.common.check import leap_assert + +EVENTS_ENABLED = True + + +def set_events_enabled(flag): + leap_assert(isinstance(flag, bool)) + global EVENTS_ENABLED + EVENTS_ENABLED = flag diff --git a/src/leap/common/events/tests/__init__.py b/src/leap/common/events/tests/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/leap/common/events/tests/__init__.py diff --git a/src/leap/common/events/tests/test_zmq_components.py b/src/leap/common/events/tests/test_zmq_components.py new file mode 100644 index 0000000..c51e37e --- /dev/null +++ b/src/leap/common/events/tests/test_zmq_components.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- +# test_zmq_components.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for the zmq_components module. +""" +try: + import unittest2 as unittest +except ImportError: + import unittest + +from leap.common.events import zmq_components + + +class AddrParseTestCase(unittest.TestCase): + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_addr_parsing(self): + addr_re = zmq_components.ADDRESS_RE + + self.assertEqual( + addr_re.search("ipc:///tmp/foo/bar/baaz-2/foo.0").groups(), + ("ipc", "/tmp/foo/bar/baaz-2/foo.0", None)) + self.assertEqual( + addr_re.search("tcp://localhost:9000").groups(), + ("tcp", "localhost", "9000")) + self.assertEqual( + addr_re.search("tcp://127.0.0.1:9000").groups(), + ("tcp", "127.0.0.1", "9000")) + + +if __name__ == "__main__": + unittest.main() diff --git a/src/leap/common/events/txclient.py b/src/leap/common/events/txclient.py index 8206ed5..dfd0533 100644 --- a/src/leap/common/events/txclient.py +++ b/src/leap/common/events/txclient.py @@ -14,8 +14,6 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. - - """ The client end point of the events mechanism, implemented using txzmq. @@ -27,8 +25,6 @@ When a client registers a callback for a given event, it also tells the server that it wants to be notified whenever events of that type are sent by some other client. """ - - import logging import pickle @@ -62,7 +58,7 @@ class EventsTxClient(TxZmqClientComponent, EventsClient): """ def __init__(self, emit_addr=EMIT_ADDR, reg_addr=REG_ADDR, - path_prefix=None): + path_prefix=None): """ Initialize the events server. """ @@ -112,6 +108,19 @@ class EventsTxClient(TxZmqClientComponent, EventsClient): """ self._push.send(data) + def _run_callback(self, callback, event, content): + """ + Run a callback. + + :param callback: The callback to be run. + :type callback: callable(event, *content) + :param event: The event to be sent. + :type event: Event + :param content: The content of the event. + :type content: list + """ + callback(event, *content) + def shutdown(self): TxZmqClientComponent.shutdown(self) EventsClient.shutdown(self) diff --git a/src/leap/common/events/zmq_components.py b/src/leap/common/events/zmq_components.py index 4fb95d3..51de02c 100644 --- a/src/leap/common/events/zmq_components.py +++ b/src/leap/common/events/zmq_components.py @@ -25,6 +25,7 @@ import os import logging import txzmq import re +import time from abc import ABCMeta @@ -36,7 +37,7 @@ try: except ImportError: pass -from leap.common.config import get_path_prefix +from leap.common.config import flags, get_path_prefix from leap.common.zmq_utils import zmq_has_curve from leap.common.zmq_utils import maybe_create_and_get_certificates from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX @@ -45,7 +46,7 @@ from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX logger = logging.getLogger(__name__) -ADDRESS_RE = re.compile("(.+)://(.+):([0-9]+)") +ADDRESS_RE = re.compile("^([a-z]+)://([^:]+):?(\d+)?$") class TxZmqComponent(object): @@ -63,8 +64,8 @@ class TxZmqComponent(object): """ self._factory = txzmq.ZmqFactory() self._factory.registerForShutdown() - if path_prefix == None: - path_prefix = get_path_prefix() + if path_prefix is None: + path_prefix = get_path_prefix(flags.STANDALONE) self._config_prefix = os.path.join(path_prefix, "leap", "events") self._connections = [] @@ -125,15 +126,24 @@ class TxZmqComponent(object): socket.curve_publickey = public socket.curve_secretkey = secret self._start_thread_auth(connection.socket) - # check if port was given - protocol, addr, port = ADDRESS_RE.match(address).groups() - if port == "0": - port = socket.bind_to_random_port("%s://%s" % (protocol, addr)) + + proto, addr, port = ADDRESS_RE.search(address).groups() + + if proto == "tcp": + if port is None or port is '0': + params = proto, addr + port = socket.bind_to_random_port("%s://%s" % params) + logger.debug("Binded %s to %s://%s." % ((connClass,) + params)) + else: + params = proto, addr, int(port) + socket.bind("%s://%s:%d" % params) + logger.debug( + "Binded %s to %s://%s:%d." % ((connClass,) + params)) else: - socket.bind(address) - port = int(port) - logger.debug("Binded %s to %s://%s:%d." - % (connClass, protocol, addr, port)) + params = proto, addr + socket.bind("%s://%s" % params) + logger.debug( + "Binded %s to %s://%s" % ((connClass,) + params)) self._connections.append(connection) return connection, port @@ -145,6 +155,11 @@ class TxZmqComponent(object): :type socket: zmq.Socket """ authenticator = ThreadAuthenticator(self._factory.context) + + # Temporary fix until we understand what the problem is + # See https://leap.se/code/issues/7536 + time.sleep(0.5) + authenticator.start() # XXX do not hardcode this here. authenticator.allow('127.0.0.1') |