summaryrefslogtreecommitdiff
path: root/src/leap/common/events/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/common/events/client.py')
-rw-r--r--src/leap/common/events/client.py54
1 files changed, 40 insertions, 14 deletions
diff --git a/src/leap/common/events/client.py b/src/leap/common/events/client.py
index 0706fe3..60d24bc 100644
--- a/src/leap/common/events/client.py
+++ b/src/leap/common/events/client.py
@@ -14,8 +14,6 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-
"""
The client end point of the events mechanism.
@@ -27,8 +25,6 @@ When a client registers a callback for a given event, it also tells the
server that it wants to be notified whenever events of that type are sent by
some other client.
"""
-
-
import logging
import collections
import uuid
@@ -51,7 +47,7 @@ try:
except ImportError:
pass
-from leap.common.config import get_path_prefix
+from leap.common.config import flags, get_path_prefix
from leap.common.zmq_utils import zmq_has_curve
from leap.common.zmq_utils import maybe_create_and_get_certificates
from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX
@@ -173,21 +169,38 @@ class EventsClient(object):
:param content: The content of the event.
:type content: list
"""
- logger.debug("Sending event: (%s, %s)" % (event, content))
- self._send(str(event) + b'\0' + pickle.dumps(content))
+ logger.debug("Emitting event: (%s, %s)" % (event, content))
+ payload = str(event) + b'\0' + pickle.dumps(content)
+ self._send(payload)
def _handle_event(self, event, content):
"""
Handle an incoming event.
- :param msg: The incoming message.
- :type msg: list(str)
+ :param event: The event to be sent.
+ :type event: Event
+ :param content: The content of the event.
+ :type content: list
"""
logger.debug("Handling event %s..." % event)
- for uid in self._callbacks[event].keys():
+ for uid in self._callbacks[event]:
callback = self._callbacks[event][uid]
logger.debug("Executing callback %s." % uid)
- callback(event, *content)
+ self._run_callback(callback, event, content)
+
+ @abstractmethod
+ def _run_callback(self, callback, event, content):
+ """
+ Run a callback.
+
+ :param callback: The callback to be run.
+ :type callback: callable(event, *content)
+ :param event: The event to be sent.
+ :type event: Event
+ :param content: The content of the event.
+ :type content: list
+ """
+ pass
@abstractmethod
def _subscribe(self, tag):
@@ -266,7 +279,7 @@ class EventsClientThread(threading.Thread, EventsClient):
self._lock = threading.Lock()
self._initialized = threading.Event()
self._config_prefix = os.path.join(
- get_path_prefix(), "leap", "events")
+ get_path_prefix(flags.STANDALONE), "leap", "events")
self._loop = None
self._context = None
self._push = None
@@ -368,10 +381,22 @@ class EventsClientThread(threading.Thread, EventsClient):
:param data: The data to be sent.
:type event: str
"""
- logger.debug("Sending data: %s" % data)
# add send() as a callback for ioloop so it works between threads
self._loop.add_callback(lambda: self._push.send(data))
+ def _run_callback(self, callback, event, content):
+ """
+ Run a callback.
+
+ :param callback: The callback to be run.
+ :type callback: callable(event, *content)
+ :param event: The event to be sent.
+ :type event: Event
+ :param content: The content of the event.
+ :type content: list
+ """
+ self._loop.add_callback(lambda: callback(event, *content))
+
def register(self, event, callback, uid=None, replace=False):
"""
Register a callback to be executed when an event is received.
@@ -393,7 +418,8 @@ class EventsClientThread(threading.Thread, EventsClient):
callback identified by the given uid and replace is False.
"""
self.ensure_client()
- return EventsClient.register(self, event, callback, uid=uid, replace=replace)
+ return EventsClient.register(
+ self, event, callback, uid=uid, replace=replace)
def unregister(self, event, uid=None):
"""