summaryrefslogtreecommitdiff
path: root/src/leap/common/events
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/common/events')
-rw-r--r--src/leap/common/events/__init__.py33
-rw-r--r--src/leap/common/events/client.py54
-rw-r--r--src/leap/common/events/flags.py28
-rw-r--r--src/leap/common/events/tests/__init__.py0
-rw-r--r--src/leap/common/events/tests/test_zmq_components.py51
-rw-r--r--src/leap/common/events/txclient.py19
-rw-r--r--src/leap/common/events/zmq_components.py39
7 files changed, 186 insertions, 38 deletions
diff --git a/src/leap/common/events/__init__.py b/src/leap/common/events/__init__.py
index 9269b9a..f9ad5fa 100644
--- a/src/leap/common/events/__init__.py
+++ b/src/leap/common/events/__init__.py
@@ -14,8 +14,6 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
"""
This is an events mechanism that uses a server to allow for emitting events
between clients.
@@ -37,13 +35,15 @@ To emit an event, use leap.common.events.emit():
>>> from leap.common.events import catalog
>>> emit(catalog.CLIENT_UID)
"""
-
-
import logging
import argparse
from leap.common.events import client
+from leap.common.events import txclient
from leap.common.events import server
+from leap.common.events import flags
+from leap.common.events.flags import set_events_enabled
+
from leap.common.events import catalog
@@ -52,6 +52,7 @@ __all__ = [
"unregister",
"emit",
"catalog",
+ "set_events_enabled"
]
@@ -78,7 +79,13 @@ def register(event, callback, uid=None, replace=False):
:raises CallbackAlreadyRegistered: when there's already a callback
identified by the given uid and replace is False.
"""
- return client.register(event, callback, uid, replace)
+ if flags.EVENTS_ENABLED:
+ return client.register(event, callback, uid, replace)
+
+
+def register_async(event, callback, uid=None, replace=False):
+ if flags.EVENTS_ENABLED:
+ return txclient.register(event, callback, uid, replace)
def unregister(event, uid=None):
@@ -93,7 +100,13 @@ def unregister(event, uid=None):
:param uid: The callback uid.
:type uid: str
"""
- return client.unregister(event, uid)
+ if flags.EVENTS_ENABLED:
+ return client.unregister(event, uid)
+
+
+def unregister_async(event, uid=None):
+ if flags.EVENTS_ENABLED:
+ return txclient.unregister(event, uid)
def emit(event, *content):
@@ -105,7 +118,13 @@ def emit(event, *content):
:param content: The content of the event.
:type content: list
"""
- return client.emit(event, *content)
+ if flags.EVENTS_ENABLED:
+ return client.emit(event, *content)
+
+
+def emit_async(event, *content):
+ if flags.EVENTS_ENABLED:
+ return txclient.emit(event, *content)
if __name__ == "__main__":
diff --git a/src/leap/common/events/client.py b/src/leap/common/events/client.py
index 0706fe3..60d24bc 100644
--- a/src/leap/common/events/client.py
+++ b/src/leap/common/events/client.py
@@ -14,8 +14,6 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
"""
The client end point of the events mechanism.
@@ -27,8 +25,6 @@ When a client registers a callback for a given event, it also tells the
server that it wants to be notified whenever events of that type are sent by
some other client.
"""
-
-
import logging
import collections
import uuid
@@ -51,7 +47,7 @@ try:
except ImportError:
pass
-from leap.common.config import get_path_prefix
+from leap.common.config import flags, get_path_prefix
from leap.common.zmq_utils import zmq_has_curve
from leap.common.zmq_utils import maybe_create_and_get_certificates
from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX
@@ -173,21 +169,38 @@ class EventsClient(object):
:param content: The content of the event.
:type content: list
"""
- logger.debug("Sending event: (%s, %s)" % (event, content))
- self._send(str(event) + b'\0' + pickle.dumps(content))
+ logger.debug("Emitting event: (%s, %s)" % (event, content))
+ payload = str(event) + b'\0' + pickle.dumps(content)
+ self._send(payload)
def _handle_event(self, event, content):
"""
Handle an incoming event.
- :param msg: The incoming message.
- :type msg: list(str)
+ :param event: The event to be sent.
+ :type event: Event
+ :param content: The content of the event.
+ :type content: list
"""
logger.debug("Handling event %s..." % event)
- for uid in self._callbacks[event].keys():
+ for uid in self._callbacks[event]:
callback = self._callbacks[event][uid]
logger.debug("Executing callback %s." % uid)
- callback(event, *content)
+ self._run_callback(callback, event, content)
+
+ @abstractmethod
+ def _run_callback(self, callback, event, content):
+ """
+ Run a callback.
+
+ :param callback: The callback to be run.
+ :type callback: callable(event, *content)
+ :param event: The event to be sent.
+ :type event: Event
+ :param content: The content of the event.
+ :type content: list
+ """
+ pass
@abstractmethod
def _subscribe(self, tag):
@@ -266,7 +279,7 @@ class EventsClientThread(threading.Thread, EventsClient):
self._lock = threading.Lock()
self._initialized = threading.Event()
self._config_prefix = os.path.join(
- get_path_prefix(), "leap", "events")
+ get_path_prefix(flags.STANDALONE), "leap", "events")
self._loop = None
self._context = None
self._push = None
@@ -368,10 +381,22 @@ class EventsClientThread(threading.Thread, EventsClient):
:param data: The data to be sent.
:type event: str
"""
- logger.debug("Sending data: %s" % data)
# add send() as a callback for ioloop so it works between threads
self._loop.add_callback(lambda: self._push.send(data))
+ def _run_callback(self, callback, event, content):
+ """
+ Run a callback.
+
+ :param callback: The callback to be run.
+ :type callback: callable(event, *content)
+ :param event: The event to be sent.
+ :type event: Event
+ :param content: The content of the event.
+ :type content: list
+ """
+ self._loop.add_callback(lambda: callback(event, *content))
+
def register(self, event, callback, uid=None, replace=False):
"""
Register a callback to be executed when an event is received.
@@ -393,7 +418,8 @@ class EventsClientThread(threading.Thread, EventsClient):
callback identified by the given uid and replace is False.
"""
self.ensure_client()
- return EventsClient.register(self, event, callback, uid=uid, replace=replace)
+ return EventsClient.register(
+ self, event, callback, uid=uid, replace=replace)
def unregister(self, event, uid=None):
"""
diff --git a/src/leap/common/events/flags.py b/src/leap/common/events/flags.py
new file mode 100644
index 0000000..137f663
--- /dev/null
+++ b/src/leap/common/events/flags.py
@@ -0,0 +1,28 @@
+# -*- coding: utf-8 -*-
+# __init__.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Flags for the events framework.
+"""
+from leap.common.check import leap_assert
+
+EVENTS_ENABLED = True
+
+
+def set_events_enabled(flag):
+ leap_assert(isinstance(flag, bool))
+ global EVENTS_ENABLED
+ EVENTS_ENABLED = flag
diff --git a/src/leap/common/events/tests/__init__.py b/src/leap/common/events/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/leap/common/events/tests/__init__.py
diff --git a/src/leap/common/events/tests/test_zmq_components.py b/src/leap/common/events/tests/test_zmq_components.py
new file mode 100644
index 0000000..c51e37e
--- /dev/null
+++ b/src/leap/common/events/tests/test_zmq_components.py
@@ -0,0 +1,51 @@
+# -*- coding: utf-8 -*-
+# test_zmq_components.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for the zmq_components module.
+"""
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
+
+from leap.common.events import zmq_components
+
+
+class AddrParseTestCase(unittest.TestCase):
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_addr_parsing(self):
+ addr_re = zmq_components.ADDRESS_RE
+
+ self.assertEqual(
+ addr_re.search("ipc:///tmp/foo/bar/baaz-2/foo.0").groups(),
+ ("ipc", "/tmp/foo/bar/baaz-2/foo.0", None))
+ self.assertEqual(
+ addr_re.search("tcp://localhost:9000").groups(),
+ ("tcp", "localhost", "9000"))
+ self.assertEqual(
+ addr_re.search("tcp://127.0.0.1:9000").groups(),
+ ("tcp", "127.0.0.1", "9000"))
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/src/leap/common/events/txclient.py b/src/leap/common/events/txclient.py
index 8206ed5..dfd0533 100644
--- a/src/leap/common/events/txclient.py
+++ b/src/leap/common/events/txclient.py
@@ -14,8 +14,6 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
"""
The client end point of the events mechanism, implemented using txzmq.
@@ -27,8 +25,6 @@ When a client registers a callback for a given event, it also tells the
server that it wants to be notified whenever events of that type are sent by
some other client.
"""
-
-
import logging
import pickle
@@ -62,7 +58,7 @@ class EventsTxClient(TxZmqClientComponent, EventsClient):
"""
def __init__(self, emit_addr=EMIT_ADDR, reg_addr=REG_ADDR,
- path_prefix=None):
+ path_prefix=None):
"""
Initialize the events server.
"""
@@ -112,6 +108,19 @@ class EventsTxClient(TxZmqClientComponent, EventsClient):
"""
self._push.send(data)
+ def _run_callback(self, callback, event, content):
+ """
+ Run a callback.
+
+ :param callback: The callback to be run.
+ :type callback: callable(event, *content)
+ :param event: The event to be sent.
+ :type event: Event
+ :param content: The content of the event.
+ :type content: list
+ """
+ callback(event, *content)
+
def shutdown(self):
TxZmqClientComponent.shutdown(self)
EventsClient.shutdown(self)
diff --git a/src/leap/common/events/zmq_components.py b/src/leap/common/events/zmq_components.py
index 4fb95d3..51de02c 100644
--- a/src/leap/common/events/zmq_components.py
+++ b/src/leap/common/events/zmq_components.py
@@ -25,6 +25,7 @@ import os
import logging
import txzmq
import re
+import time
from abc import ABCMeta
@@ -36,7 +37,7 @@ try:
except ImportError:
pass
-from leap.common.config import get_path_prefix
+from leap.common.config import flags, get_path_prefix
from leap.common.zmq_utils import zmq_has_curve
from leap.common.zmq_utils import maybe_create_and_get_certificates
from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX
@@ -45,7 +46,7 @@ from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX
logger = logging.getLogger(__name__)
-ADDRESS_RE = re.compile("(.+)://(.+):([0-9]+)")
+ADDRESS_RE = re.compile("^([a-z]+)://([^:]+):?(\d+)?$")
class TxZmqComponent(object):
@@ -63,8 +64,8 @@ class TxZmqComponent(object):
"""
self._factory = txzmq.ZmqFactory()
self._factory.registerForShutdown()
- if path_prefix == None:
- path_prefix = get_path_prefix()
+ if path_prefix is None:
+ path_prefix = get_path_prefix(flags.STANDALONE)
self._config_prefix = os.path.join(path_prefix, "leap", "events")
self._connections = []
@@ -125,15 +126,24 @@ class TxZmqComponent(object):
socket.curve_publickey = public
socket.curve_secretkey = secret
self._start_thread_auth(connection.socket)
- # check if port was given
- protocol, addr, port = ADDRESS_RE.match(address).groups()
- if port == "0":
- port = socket.bind_to_random_port("%s://%s" % (protocol, addr))
+
+ proto, addr, port = ADDRESS_RE.search(address).groups()
+
+ if proto == "tcp":
+ if port is None or port is '0':
+ params = proto, addr
+ port = socket.bind_to_random_port("%s://%s" % params)
+ logger.debug("Binded %s to %s://%s." % ((connClass,) + params))
+ else:
+ params = proto, addr, int(port)
+ socket.bind("%s://%s:%d" % params)
+ logger.debug(
+ "Binded %s to %s://%s:%d." % ((connClass,) + params))
else:
- socket.bind(address)
- port = int(port)
- logger.debug("Binded %s to %s://%s:%d."
- % (connClass, protocol, addr, port))
+ params = proto, addr
+ socket.bind("%s://%s" % params)
+ logger.debug(
+ "Binded %s to %s://%s" % ((connClass,) + params))
self._connections.append(connection)
return connection, port
@@ -145,6 +155,11 @@ class TxZmqComponent(object):
:type socket: zmq.Socket
"""
authenticator = ThreadAuthenticator(self._factory.context)
+
+ # Temporary fix until we understand what the problem is
+ # See https://leap.se/code/issues/7536
+ time.sleep(0.5)
+
authenticator.start()
# XXX do not hardcode this here.
authenticator.allow('127.0.0.1')