summaryrefslogtreecommitdiff
path: root/src/leap/bitmask/backend/backend_proxy.py
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2015-01-28 16:17:51 -0200
committerdrebs <drebs@leap.se>2015-05-08 16:11:47 -0300
commit01b005b0642454e3d670089ed7e530eda8e9ef91 (patch)
treed4a99c7e581dbb512d2430db4f0f8d1c90ff310a /src/leap/bitmask/backend/backend_proxy.py
parent456941648223a14fe144264c27a5dce4e4e702e5 (diff)
[feat] use txzmq in backend
Before this commit, the backend used plain pyzmq bindings for communicating with the frontend. This implements the txzmq twisted-powered bindings instead. Closes: #6360
Diffstat (limited to 'src/leap/bitmask/backend/backend_proxy.py')
-rw-r--r--src/leap/bitmask/backend/backend_proxy.py239
1 files changed, 135 insertions, 104 deletions
diff --git a/src/leap/bitmask/backend/backend_proxy.py b/src/leap/bitmask/backend/backend_proxy.py
index 04046d3d..8d6930d6 100644
--- a/src/leap/bitmask/backend/backend_proxy.py
+++ b/src/leap/bitmask/backend/backend_proxy.py
@@ -21,11 +21,13 @@ to the backend.
# XXX should document the relationship to the API here.
import functools
-import Queue
import threading
-import time
import zmq
+from zmq.eventloop import ioloop
+from zmq.eventloop import zmqstream
+
+from taskthread import TimerTask
from leap.bitmask.backend.api import API, STOP_REQUEST, PING_REQUEST
from leap.bitmask.backend.settings import Settings
@@ -37,35 +39,39 @@ import logging
logger = logging.getLogger(__name__)
-class BackendProxy(object):
+class ZmqREQConnection(threading.Thread):
"""
- The BackendProxy handles calls from the GUI and forwards (through ZMQ)
- to the backend.
+ A threaded zmq req connection.
"""
- if flags.ZMQ_HAS_CURVE:
- PORT = '5556'
- SERVER = "tcp://localhost:%s" % PORT
- else:
- SERVER = "ipc:///tmp/bitmask.socket.0"
-
- POLL_TIMEOUT = 4000 # ms
- POLL_TRIES = 3
-
- PING_INTERVAL = 2 # secs
-
- def __init__(self):
- generate_zmq_certificates_if_needed()
-
- self._socket = None
+ def __init__(self, server_address, on_recv):
+ """
+ Initialize the connection.
- self.settings = Settings()
+ :param server_address: The address of the backend zmq server.
+ :type server: str
+ :param on_recv: The callback to be executed when a message is
+ received.
+ :type on_recv: callable(msg)
+ """
+ threading.Thread.__init__(self)
+ self._server_address = server_address
+ self._on_recv = on_recv
+ self._stream = None
+ self._init_zmq()
- # initialize ZMQ stuff:
+ def _init_zmq(self):
+ """
+ Configure the zmq components and connection.
+ """
+ logger.debug("Setting up ZMQ connection to server...")
context = zmq.Context()
- logger.debug("Connecting to server...")
socket = context.socket(zmq.REQ)
+ # we use zmq's eventloop in order to asynchronously send requests
+ loop = ioloop.ZMQIOLoop.current()
+ self._stream = zmqstream.ZMQStream(socket, loop)
+
if flags.ZMQ_HAS_CURVE:
# public, secret = zmq.curve_keypair()
client_keys = zmq.curve_keypair()
@@ -79,66 +85,128 @@ class BackendProxy(object):
socket.setsockopt(zmq.RCVTIMEO, 1000)
socket.setsockopt(zmq.LINGER, 0) # Terminate early
- socket.connect(self.SERVER)
- self._socket = socket
- self._ping_at = 0
+ self._stream.on_recv(self._on_recv)
+
+ def run(self):
+ """
+ Run the threaded stream connection loop.
+ """
+ self._stream.socket.connect(self._server_address)
+ logger.debug("Starting ZMQ loop.")
+ self._stream.io_loop.start()
+ logger.debug("Finished ZMQ loop.")
+
+ def stop(self):
+ """
+ Stop the threaded connection loop.
+ """
+ self._stream.io_loop.stop()
+
+ def send(self, *args, **kwargs):
+ """
+ Send a message through this connection.
+ """
+ # Important note: calling send on the zmqstream from another
+ # thread doesn’t properly tell the IOLoop thread that there’s an
+ # event to process. This could cuase small delays if the IOLoop is
+ # already processing lots of events, but it can cause the message
+ # to never send if the zmq socket is the only one it’s handling.
+ #
+ # Because of that, we want ZmqREQConnection.send to hand off the
+ # stream.send to the IOLoop’s thread via IOLoop.add_callback:
+ self._stream.io_loop.add_callback(
+ lambda: self._stream.send(*args, **kwargs))
+
+
+class BackendProxy(object):
+ """
+ The BackendProxy handles calls from the GUI and forwards (through ZMQ)
+ to the backend.
+ """
+
+ if flags.ZMQ_HAS_CURVE:
+ PORT = '5556'
+ SERVER = "tcp://localhost:%s" % PORT
+ else:
+ SERVER = "ipc:///tmp/bitmask.socket.0"
+
+ PING_INTERVAL = 2 # secs
+
+ def __init__(self):
+ """
+ Initialize the backend proxy.
+ """
+ generate_zmq_certificates_if_needed()
+ self._do_work = threading.Event()
+ self._work_lock = threading.Lock()
+ self._connection = ZmqREQConnection(self.SERVER, self._set_online)
+ self._heartbeat = TimerTask(self._ping, delay=self.PING_INTERVAL)
+ self._ping_event = threading.Event()
self.online = False
+ self.settings = Settings()
- self._call_queue = Queue.Queue()
- self._worker_caller = threading.Thread(target=self._worker)
+ def _set_online(self, _):
+ """
+ Mark the backend as being online.
- def start(self):
- self._worker_caller.start()
+ This is used as the zmq connection's on_recv callback, and so it is
+ passed the received message as a parameter. Because we currently don't
+ use that message, we just ignore it for now.
+ """
+ self.online = True
+ # the following event is used when checking whether the backend is
+ # online
+ self._ping_event.set()
+
+ def _set_offline(self):
+ """
+ Mark the backend as being offline.
+ """
+ self.online = False
def check_online(self):
"""
Return whether the backend is accessible or not.
You don't need to do `run` in order to use this.
-
:rtype: bool
"""
- # we use a small timeout in order to response quickly if the backend is
- # offline
- self._send_request(PING_REQUEST, retry=False, timeout=500)
- self._socket.close()
+ logger.debug("Checking whether backend is online...")
+ self._send_request(PING_REQUEST)
+ # self._ping_event will eventually be set by the zmq connection's
+ # on_recv callback, so we use a small timeout in order to response
+ # quickly if the backend is offline
+ if not self._ping_event.wait(0.5):
+ logger.warning("Backend is offline!")
+ self._set_offline()
return self.online
- def _worker(self):
+ def start(self):
"""
- Worker loop that processes the Queue of pending requests to do.
+ Start the backend proxy.
"""
- while True:
- try:
- request = self._call_queue.get(block=False)
- # break the loop after sending the 'stop' action to the
- # backend.
- if request == STOP_REQUEST:
- break
-
- self._send_request(request)
- except Queue.Empty:
- pass
- time.sleep(0.01)
- self._ping()
+ logger.debug("Starting backend proxy...")
+ self._do_work.set()
+ self._connection.start()
+ self.check_online()
+ self._heartbeat.start()
- logger.debug("BackendProxy worker stopped.")
-
- def _reset_ping(self):
+ def _stop(self):
"""
- Reset the ping timeout counter.
- This is called for every ping and request.
+ Stop the backend proxy.
"""
- self._ping_at = time.time() + self.PING_INTERVAL
+ with self._work_lock: # avoid sending after connection was closed
+ self._do_work.clear()
+ self._heartbeat.stop()
+ self._connection.stop()
+ logger.debug("BackendProxy worker stopped.")
def _ping(self):
"""
Heartbeat helper.
Sends a PING request just to know that the server is alive.
"""
- if time.time() > self._ping_at:
- self._send_request(PING_REQUEST)
- self._reset_ping()
+ self._send_request(PING_REQUEST)
def _api_call(self, *args, **kwargs):
"""
@@ -162,6 +230,8 @@ class BackendProxy(object):
'arguments': kwargs,
}
+ request_json = None
+
try:
request_json = zmq.utils.jsonapi.dumps(request)
except Exception as e:
@@ -172,12 +242,12 @@ class BackendProxy(object):
raise
# queue the call in order to handle the request in a thread safe way.
- self._call_queue.put(request_json)
+ self._send_request(request_json)
if api_method == STOP_REQUEST:
- self._call_queue.put(STOP_REQUEST)
+ self._stop()
- def _send_request(self, request, retry=True, timeout=None):
+ def _send_request(self, request):
"""
Send the given request to the server.
This is used from a thread safe loop in order to avoid sending a
@@ -185,49 +255,10 @@ class BackendProxy(object):
:param request: the request to send.
:type request: str
- :param retry: whether we should retry or not in case of timeout.
- :type retry: bool
- :param timeout: a custom timeout (milliseconds) to wait for a response.
- :type timeout: int
"""
- # logger.debug("Sending request to backend: {0}".format(request))
- self._socket.send(request)
-
- poll = zmq.Poller()
- poll.register(self._socket, zmq.POLLIN)
-
- reply = None
-
- tries = 0
- if not retry:
- tries = self.POLL_TRIES + 1 # this means: no retries left
-
- if timeout is None:
- timeout = self.POLL_TIMEOUT
-
- while True:
- socks = dict(poll.poll(timeout))
- if socks.get(self._socket) == zmq.POLLIN:
- reply = self._socket.recv()
- break
-
- tries += 1
- if tries < self.POLL_TRIES:
- logger.warning('Retrying receive... {0}/{1}'.format(
- tries, self.POLL_TRIES))
- else:
- break
-
- if reply is None:
- msg = "Timeout error contacting backend."
- logger.critical(msg)
- self.online = False
- else:
- # msg = "Received reply for '{0}' -> '{1}'".format(request, reply)
- # logger.debug(msg)
- self.online = True
- # request received, no ping needed for other interval.
- self._reset_ping()
+ with self._work_lock: # avoid sending after connection was closed
+ if self._do_work.is_set():
+ self._connection.send(request)
def __getattribute__(self, name):
"""