From 01b005b0642454e3d670089ed7e530eda8e9ef91 Mon Sep 17 00:00:00 2001 From: drebs Date: Wed, 28 Jan 2015 16:17:51 -0200 Subject: [feat] use txzmq in backend Before this commit, the backend used plain pyzmq bindings for communicating with the frontend. This implements the txzmq twisted-powered bindings instead. Closes: #6360 --- src/leap/bitmask/backend/backend_proxy.py | 239 +++++++++++++++++------------- 1 file changed, 135 insertions(+), 104 deletions(-) (limited to 'src/leap/bitmask/backend/backend_proxy.py') diff --git a/src/leap/bitmask/backend/backend_proxy.py b/src/leap/bitmask/backend/backend_proxy.py index 04046d3d..8d6930d6 100644 --- a/src/leap/bitmask/backend/backend_proxy.py +++ b/src/leap/bitmask/backend/backend_proxy.py @@ -21,11 +21,13 @@ to the backend. # XXX should document the relationship to the API here. import functools -import Queue import threading -import time import zmq +from zmq.eventloop import ioloop +from zmq.eventloop import zmqstream + +from taskthread import TimerTask from leap.bitmask.backend.api import API, STOP_REQUEST, PING_REQUEST from leap.bitmask.backend.settings import Settings @@ -37,35 +39,39 @@ import logging logger = logging.getLogger(__name__) -class BackendProxy(object): +class ZmqREQConnection(threading.Thread): """ - The BackendProxy handles calls from the GUI and forwards (through ZMQ) - to the backend. + A threaded zmq req connection. """ - if flags.ZMQ_HAS_CURVE: - PORT = '5556' - SERVER = "tcp://localhost:%s" % PORT - else: - SERVER = "ipc:///tmp/bitmask.socket.0" - - POLL_TIMEOUT = 4000 # ms - POLL_TRIES = 3 - - PING_INTERVAL = 2 # secs - - def __init__(self): - generate_zmq_certificates_if_needed() - - self._socket = None + def __init__(self, server_address, on_recv): + """ + Initialize the connection. - self.settings = Settings() + :param server_address: The address of the backend zmq server. + :type server: str + :param on_recv: The callback to be executed when a message is + received. + :type on_recv: callable(msg) + """ + threading.Thread.__init__(self) + self._server_address = server_address + self._on_recv = on_recv + self._stream = None + self._init_zmq() - # initialize ZMQ stuff: + def _init_zmq(self): + """ + Configure the zmq components and connection. + """ + logger.debug("Setting up ZMQ connection to server...") context = zmq.Context() - logger.debug("Connecting to server...") socket = context.socket(zmq.REQ) + # we use zmq's eventloop in order to asynchronously send requests + loop = ioloop.ZMQIOLoop.current() + self._stream = zmqstream.ZMQStream(socket, loop) + if flags.ZMQ_HAS_CURVE: # public, secret = zmq.curve_keypair() client_keys = zmq.curve_keypair() @@ -79,66 +85,128 @@ class BackendProxy(object): socket.setsockopt(zmq.RCVTIMEO, 1000) socket.setsockopt(zmq.LINGER, 0) # Terminate early - socket.connect(self.SERVER) - self._socket = socket - self._ping_at = 0 + self._stream.on_recv(self._on_recv) + + def run(self): + """ + Run the threaded stream connection loop. + """ + self._stream.socket.connect(self._server_address) + logger.debug("Starting ZMQ loop.") + self._stream.io_loop.start() + logger.debug("Finished ZMQ loop.") + + def stop(self): + """ + Stop the threaded connection loop. + """ + self._stream.io_loop.stop() + + def send(self, *args, **kwargs): + """ + Send a message through this connection. + """ + # Important note: calling send on the zmqstream from another + # thread doesn’t properly tell the IOLoop thread that there’s an + # event to process. This could cuase small delays if the IOLoop is + # already processing lots of events, but it can cause the message + # to never send if the zmq socket is the only one it’s handling. + # + # Because of that, we want ZmqREQConnection.send to hand off the + # stream.send to the IOLoop’s thread via IOLoop.add_callback: + self._stream.io_loop.add_callback( + lambda: self._stream.send(*args, **kwargs)) + + +class BackendProxy(object): + """ + The BackendProxy handles calls from the GUI and forwards (through ZMQ) + to the backend. + """ + + if flags.ZMQ_HAS_CURVE: + PORT = '5556' + SERVER = "tcp://localhost:%s" % PORT + else: + SERVER = "ipc:///tmp/bitmask.socket.0" + + PING_INTERVAL = 2 # secs + + def __init__(self): + """ + Initialize the backend proxy. + """ + generate_zmq_certificates_if_needed() + self._do_work = threading.Event() + self._work_lock = threading.Lock() + self._connection = ZmqREQConnection(self.SERVER, self._set_online) + self._heartbeat = TimerTask(self._ping, delay=self.PING_INTERVAL) + self._ping_event = threading.Event() self.online = False + self.settings = Settings() - self._call_queue = Queue.Queue() - self._worker_caller = threading.Thread(target=self._worker) + def _set_online(self, _): + """ + Mark the backend as being online. - def start(self): - self._worker_caller.start() + This is used as the zmq connection's on_recv callback, and so it is + passed the received message as a parameter. Because we currently don't + use that message, we just ignore it for now. + """ + self.online = True + # the following event is used when checking whether the backend is + # online + self._ping_event.set() + + def _set_offline(self): + """ + Mark the backend as being offline. + """ + self.online = False def check_online(self): """ Return whether the backend is accessible or not. You don't need to do `run` in order to use this. - :rtype: bool """ - # we use a small timeout in order to response quickly if the backend is - # offline - self._send_request(PING_REQUEST, retry=False, timeout=500) - self._socket.close() + logger.debug("Checking whether backend is online...") + self._send_request(PING_REQUEST) + # self._ping_event will eventually be set by the zmq connection's + # on_recv callback, so we use a small timeout in order to response + # quickly if the backend is offline + if not self._ping_event.wait(0.5): + logger.warning("Backend is offline!") + self._set_offline() return self.online - def _worker(self): + def start(self): """ - Worker loop that processes the Queue of pending requests to do. + Start the backend proxy. """ - while True: - try: - request = self._call_queue.get(block=False) - # break the loop after sending the 'stop' action to the - # backend. - if request == STOP_REQUEST: - break - - self._send_request(request) - except Queue.Empty: - pass - time.sleep(0.01) - self._ping() + logger.debug("Starting backend proxy...") + self._do_work.set() + self._connection.start() + self.check_online() + self._heartbeat.start() - logger.debug("BackendProxy worker stopped.") - - def _reset_ping(self): + def _stop(self): """ - Reset the ping timeout counter. - This is called for every ping and request. + Stop the backend proxy. """ - self._ping_at = time.time() + self.PING_INTERVAL + with self._work_lock: # avoid sending after connection was closed + self._do_work.clear() + self._heartbeat.stop() + self._connection.stop() + logger.debug("BackendProxy worker stopped.") def _ping(self): """ Heartbeat helper. Sends a PING request just to know that the server is alive. """ - if time.time() > self._ping_at: - self._send_request(PING_REQUEST) - self._reset_ping() + self._send_request(PING_REQUEST) def _api_call(self, *args, **kwargs): """ @@ -162,6 +230,8 @@ class BackendProxy(object): 'arguments': kwargs, } + request_json = None + try: request_json = zmq.utils.jsonapi.dumps(request) except Exception as e: @@ -172,12 +242,12 @@ class BackendProxy(object): raise # queue the call in order to handle the request in a thread safe way. - self._call_queue.put(request_json) + self._send_request(request_json) if api_method == STOP_REQUEST: - self._call_queue.put(STOP_REQUEST) + self._stop() - def _send_request(self, request, retry=True, timeout=None): + def _send_request(self, request): """ Send the given request to the server. This is used from a thread safe loop in order to avoid sending a @@ -185,49 +255,10 @@ class BackendProxy(object): :param request: the request to send. :type request: str - :param retry: whether we should retry or not in case of timeout. - :type retry: bool - :param timeout: a custom timeout (milliseconds) to wait for a response. - :type timeout: int """ - # logger.debug("Sending request to backend: {0}".format(request)) - self._socket.send(request) - - poll = zmq.Poller() - poll.register(self._socket, zmq.POLLIN) - - reply = None - - tries = 0 - if not retry: - tries = self.POLL_TRIES + 1 # this means: no retries left - - if timeout is None: - timeout = self.POLL_TIMEOUT - - while True: - socks = dict(poll.poll(timeout)) - if socks.get(self._socket) == zmq.POLLIN: - reply = self._socket.recv() - break - - tries += 1 - if tries < self.POLL_TRIES: - logger.warning('Retrying receive... {0}/{1}'.format( - tries, self.POLL_TRIES)) - else: - break - - if reply is None: - msg = "Timeout error contacting backend." - logger.critical(msg) - self.online = False - else: - # msg = "Received reply for '{0}' -> '{1}'".format(request, reply) - # logger.debug(msg) - self.online = True - # request received, no ping needed for other interval. - self._reset_ping() + with self._work_lock: # avoid sending after connection was closed + if self._do_work.is_set(): + self._connection.send(request) def __getattribute__(self, name): """ -- cgit v1.2.3