diff options
author | Ivan Alejandro <ivanalejandro0@gmail.com> | 2014-07-21 17:03:47 -0300 |
---|---|---|
committer | Ivan Alejandro <ivanalejandro0@gmail.com> | 2014-07-21 17:11:33 -0300 |
commit | 68b1be8ef443b088cf5c1f7f964e1bd7ad42408e (patch) | |
tree | ec882e6c61f1fe4a04ba9eca113c198f998f084f /src/leap/bitmask/backend/backend_proxy.py | |
parent | 159dbe295148975bdfe9a50f871254aa9adf2328 (diff) |
Add heartbeat to check if backend is alive.
Send a 'ping' request every 2 secs to ensure that the backend is
running.
Use polling instead of recv on the backend_proxy. This was already
implemented for the signaler.
Diffstat (limited to 'src/leap/bitmask/backend/backend_proxy.py')
-rw-r--r-- | src/leap/bitmask/backend/backend_proxy.py | 64 |
1 files changed, 54 insertions, 10 deletions
diff --git a/src/leap/bitmask/backend/backend_proxy.py b/src/leap/bitmask/backend/backend_proxy.py index f683e465..dc30d2cb 100644 --- a/src/leap/bitmask/backend/backend_proxy.py +++ b/src/leap/bitmask/backend/backend_proxy.py @@ -25,7 +25,7 @@ import time import zmq -from leap.bitmask.backend.api import API, STOP_REQUEST +from leap.bitmask.backend.api import API, STOP_REQUEST, PING_REQUEST from leap.bitmask.backend.utils import get_backend_certificates import logging @@ -40,6 +40,11 @@ class BackendProxy(object): PORT = '5556' SERVER = "tcp://localhost:%s" % PORT + POLL_TIMEOUT = 4000 # ms + POLL_TRIES = 3 + + PING_INTERVAL = 2 # secs + def __init__(self): self._socket = None @@ -62,6 +67,9 @@ class BackendProxy(object): socket.connect(self.SERVER) self._socket = socket + self._ping_at = 0 + self.online = False + self._call_queue = Queue.Queue() self._worker_caller = threading.Thread(target=self._worker) self._worker_caller.start() @@ -82,9 +90,26 @@ class BackendProxy(object): except Queue.Empty: pass time.sleep(0.01) + self._ping() logger.debug("BackendProxy worker stopped.") + def _reset_ping(self): + """ + Reset the ping timeout counter. + This is called for every ping and request. + """ + self._ping_at = time.time() + self.PING_INTERVAL + + def _ping(self): + """ + Heartbeat helper. + Sends a PING request just to know that the server is alive. + """ + if time.time() > self._ping_at: + self._send_request(PING_REQUEST) + self._reset_ping() + def _api_call(self, *args, **kwargs): """ Call the `api_method` method in backend (through zmq). @@ -134,16 +159,35 @@ class BackendProxy(object): # logger.debug("Sending request to backend: {0}".format(request)) self._socket.send(request) - try: - # Get the reply. - self._socket.recv() - # response = self._socket.recv() - # msg = "Received reply for '{0}' -> '{1}'" - # msg = msg.format(request, response) - # logger.debug(msg) - except zmq.error.Again as e: - msg = "Timeout error contacting backend. {0!r}".format(e) + poll = zmq.Poller() + poll.register(self._socket, zmq.POLLIN) + + reply = None + tries = 0 + + while True: + socks = dict(poll.poll(self.POLL_TIMEOUT)) + if socks.get(self._socket) == zmq.POLLIN: + reply = self._socket.recv() + break + + tries += 1 + if tries < self.POLL_TRIES: + logger.warning('Retrying receive... {0}/{1}'.format( + tries, self.POLL_TRIES)) + else: + break + + if reply is None: + msg = "Timeout error contacting backend." logger.critical(msg) + self.online = False + else: + # msg = "Received reply for '{0}' -> '{1}'".format(request, reply) + # logger.debug(msg) + self.online = True + # request received, no ping needed for other interval. + self._reset_ping() def __getattribute__(self, name): """ |