From 1eae3af1906f95d9d2d76c205b61799d248f0e71 Mon Sep 17 00:00:00 2001 From: Ivan Alejandro Date: Tue, 8 Jul 2014 16:45:43 -0300 Subject: Add base communication framework. --- src/leap/bitmask/backend/signaler.py | 140 +++++++++++++++++++++++++++++++++++ 1 file changed, 140 insertions(+) create mode 100644 src/leap/bitmask/backend/signaler.py (limited to 'src/leap/bitmask/backend/signaler.py') diff --git a/src/leap/bitmask/backend/signaler.py b/src/leap/bitmask/backend/signaler.py new file mode 100644 index 00000000..d96015f4 --- /dev/null +++ b/src/leap/bitmask/backend/signaler.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python +# encoding: utf-8 +import Queue +import threading +import time + +import zmq + +from leap.bitmask.backend.api import SIGNALS +from leap.bitmask.backend.utils import get_frontend_certificates + +import logging +logger = logging.getLogger(__name__) + + +class Signaler(object): + """ + Signaler client. + Receives signals from the backend and sends to the signaling server. + """ + PORT = "5667" + SERVER = "tcp://localhost:%s" % PORT + + def __init__(self): + """ + Initialize the ZMQ socket to talk to the signaling server. + """ + context = zmq.Context() + logger.debug("Connecting to signaling server...") + socket = context.socket(zmq.REQ) + + # public, secret = zmq.curve_keypair() + client_keys = zmq.curve_keypair() + socket.curve_publickey = client_keys[0] + socket.curve_secretkey = client_keys[1] + + # The client must know the server's public key to make a CURVE + # connection. + public, _ = get_frontend_certificates() + socket.curve_serverkey = public + + socket.setsockopt(zmq.RCVTIMEO, 1000) + socket.connect(self.SERVER) + self._socket = socket + + self._signal_queue = Queue.Queue() + + self._do_work = threading.Event() # used to stop the worker thread. + self._worker_signaler = threading.Thread(target=self._worker) + + def __getattribute__(self, name): + """ + This allows the user to do: + S = Signaler() + S.SOME_SIGNAL + + Just by having defined 'some_signal' in _SIGNALS + + :param name: the attribute name that is requested. + :type name: str + """ + if name in SIGNALS: + return name + else: + return object.__getattribute__(self, name) + + def signal(self, signal, data=None): + """ + Sends a signal to the signaling server. + + :param signal: the signal to send. + :type signal: str + """ + if signal not in SIGNALS: + raise Exception("Unknown signal: '{0}'".format(signal)) + + request = { + 'signal': signal, + 'data': data, + } + + try: + request_json = zmq.utils.jsonapi.dumps(request) + except Exception as e: + msg = ("Error serializing request into JSON.\n" + "Exception: {0} Data: {1}") + msg = msg.format(e, request) + logger.critical(msg) + raise + + # queue the call in order to handle the request in a thread safe way. + self._signal_queue.put(request_json) + + def _worker(self): + """ + Worker loop that processes the Queue of pending requests to do. + """ + while self._do_work.is_set(): + try: + request = self._signal_queue.get(block=False) + self._send_request(request) + except Queue.Empty: + pass + time.sleep(0.01) + + logger.debug("Signaler thread stopped.") + + def start(self): + """ + Start the Signaler worker. + """ + self._do_work.set() + self._worker_signaler.start() + + def stop(self): + """ + Stop the Signaler worker. + """ + self._do_work.clear() + + def _send_request(self, request): + """ + Send the given request to the server. + This is used from a thread safe loop in order to avoid sending a + request without receiving a response from a previous one. + + :param request: the request to send. + :type request: str + """ + logger.debug("Signaling '{0}'".format(request)) + self._socket.send(request) + + # Get the reply. + try: + response = self._socket.recv() + msg = "Received reply for '{0}' -> '{1}'".format(request, response) + logger.debug(msg) + except zmq.error.Again as e: + msg = "Timeout error contacting signaler. {0!r}".format(e) + logger.critical(msg) -- cgit v1.2.3 From 4d5e56dacf33465df5d195ada2855c86ee23a21f Mon Sep 17 00:00:00 2001 From: Ivan Alejandro Date: Thu, 19 Jun 2014 16:54:34 -0300 Subject: Add license headers. --- src/leap/bitmask/backend/signaler.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) (limited to 'src/leap/bitmask/backend/signaler.py') diff --git a/src/leap/bitmask/backend/signaler.py b/src/leap/bitmask/backend/signaler.py index d96015f4..f8753771 100644 --- a/src/leap/bitmask/backend/signaler.py +++ b/src/leap/bitmask/backend/signaler.py @@ -1,5 +1,19 @@ -#!/usr/bin/env python -# encoding: utf-8 +# -*- coding: utf-8 -*- +# signaler.py +# Copyright (C) 2013, 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . import Queue import threading import time -- cgit v1.2.3 From d33e9a2bc07ccd983a1321b254cc27ca6be989a3 Mon Sep 17 00:00:00 2001 From: Ivan Alejandro Date: Tue, 8 Jul 2014 17:01:18 -0300 Subject: Add file docstrings. --- src/leap/bitmask/backend/signaler.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'src/leap/bitmask/backend/signaler.py') diff --git a/src/leap/bitmask/backend/signaler.py b/src/leap/bitmask/backend/signaler.py index f8753771..6a0ec88a 100644 --- a/src/leap/bitmask/backend/signaler.py +++ b/src/leap/bitmask/backend/signaler.py @@ -14,6 +14,10 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . +""" +Signaler client. +Receives signals from the backend and sends to the signaling server. +""" import Queue import threading import time -- cgit v1.2.3 From d8152277022eedf5a88c84cc5f099b2fc9ad6e46 Mon Sep 17 00:00:00 2001 From: Ivan Alejandro Date: Wed, 9 Jul 2014 16:14:58 -0300 Subject: Comment out overly verbose logs for communication. --- src/leap/bitmask/backend/signaler.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) (limited to 'src/leap/bitmask/backend/signaler.py') diff --git a/src/leap/bitmask/backend/signaler.py b/src/leap/bitmask/backend/signaler.py index 6a0ec88a..f21f0998 100644 --- a/src/leap/bitmask/backend/signaler.py +++ b/src/leap/bitmask/backend/signaler.py @@ -145,14 +145,16 @@ class Signaler(object): :param request: the request to send. :type request: str """ - logger.debug("Signaling '{0}'".format(request)) + # logger.debug("Signaling '{0}'".format(request)) self._socket.send(request) # Get the reply. try: - response = self._socket.recv() - msg = "Received reply for '{0}' -> '{1}'".format(request, response) - logger.debug(msg) + self._socket.recv() + # response = self._socket.recv() + # msg = "Received reply for '{0}' -> '{1}'" + # msg = msg.format(request, response) + # logger.debug(msg) except zmq.error.Again as e: msg = "Timeout error contacting signaler. {0!r}".format(e) logger.critical(msg) -- cgit v1.2.3 From 7c7c2d49b172d08e106e297101142747eaf78944 Mon Sep 17 00:00:00 2001 From: Ivan Alejandro Date: Mon, 14 Jul 2014 16:23:20 -0300 Subject: Use polling to prevent communication issues. --- src/leap/bitmask/backend/signaler.py | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) (limited to 'src/leap/bitmask/backend/signaler.py') diff --git a/src/leap/bitmask/backend/signaler.py b/src/leap/bitmask/backend/signaler.py index f21f0998..7401f3a0 100644 --- a/src/leap/bitmask/backend/signaler.py +++ b/src/leap/bitmask/backend/signaler.py @@ -38,6 +38,8 @@ class Signaler(object): """ PORT = "5667" SERVER = "tcp://localhost:%s" % PORT + POLL_TIMEOUT = 1000 # ms + POLL_TRIES = 3 def __init__(self): """ @@ -148,13 +150,23 @@ class Signaler(object): # logger.debug("Signaling '{0}'".format(request)) self._socket.send(request) - # Get the reply. - try: - self._socket.recv() - # response = self._socket.recv() - # msg = "Received reply for '{0}' -> '{1}'" - # msg = msg.format(request, response) - # logger.debug(msg) - except zmq.error.Again as e: - msg = "Timeout error contacting signaler. {0!r}".format(e) + poll = zmq.Poller() + poll.register(self._socket, zmq.POLLIN) + + reply = None + tries = 0 + + while tries < self.POLL_TRIES: + socks = dict(poll.poll(self.POLL_TIMEOUT)) + if socks.get(self._socket) == zmq.POLLIN: + reply = self._socket.recv() + break + + tries += 1 + + if reply is None: + msg = "Timeout error contacting backend." logger.critical(msg) + # else: + # msg = "Received reply for '{0}' -> '{1}'".format(request, reply) + # logger.debug(msg) -- cgit v1.2.3 From 3d6629348aedf2a6863d242d96d64b3492e86f9a Mon Sep 17 00:00:00 2001 From: Ivan Alejandro Date: Wed, 16 Jul 2014 11:29:19 -0300 Subject: Increase timeout and retries. With this change we avoid the communication issues on OSX. --- src/leap/bitmask/backend/signaler.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) (limited to 'src/leap/bitmask/backend/signaler.py') diff --git a/src/leap/bitmask/backend/signaler.py b/src/leap/bitmask/backend/signaler.py index 7401f3a0..574bfa71 100644 --- a/src/leap/bitmask/backend/signaler.py +++ b/src/leap/bitmask/backend/signaler.py @@ -38,8 +38,8 @@ class Signaler(object): """ PORT = "5667" SERVER = "tcp://localhost:%s" % PORT - POLL_TIMEOUT = 1000 # ms - POLL_TRIES = 3 + POLL_TIMEOUT = 2000 # ms + POLL_TRIES = 500 def __init__(self): """ @@ -156,13 +156,18 @@ class Signaler(object): reply = None tries = 0 - while tries < self.POLL_TRIES: + while True: socks = dict(poll.poll(self.POLL_TIMEOUT)) if socks.get(self._socket) == zmq.POLLIN: reply = self._socket.recv() break tries += 1 + if tries < self.POLL_TRIES: + logger.warning('Retrying receive... {0}/{1}'.format( + tries, self.POLL_TRIES)) + else: + break if reply is None: msg = "Timeout error contacting backend." -- cgit v1.2.3