summaryrefslogtreecommitdiff
path: root/src/leap/bitmask/backend/backend_proxy.py
diff options
context:
space:
mode:
authorIvan Alejandro <ivanalejandro0@gmail.com>2014-07-21 17:03:47 -0300
committerIvan Alejandro <ivanalejandro0@gmail.com>2014-07-21 17:11:33 -0300
commit68b1be8ef443b088cf5c1f7f964e1bd7ad42408e (patch)
treeec882e6c61f1fe4a04ba9eca113c198f998f084f /src/leap/bitmask/backend/backend_proxy.py
parent159dbe295148975bdfe9a50f871254aa9adf2328 (diff)
Add heartbeat to check if backend is alive.
Send a 'ping' request every 2 secs to ensure that the backend is running. Use polling instead of recv on the backend_proxy. This was already implemented for the signaler.
Diffstat (limited to 'src/leap/bitmask/backend/backend_proxy.py')
-rw-r--r--src/leap/bitmask/backend/backend_proxy.py64
1 files changed, 54 insertions, 10 deletions
diff --git a/src/leap/bitmask/backend/backend_proxy.py b/src/leap/bitmask/backend/backend_proxy.py
index f683e465..dc30d2cb 100644
--- a/src/leap/bitmask/backend/backend_proxy.py
+++ b/src/leap/bitmask/backend/backend_proxy.py
@@ -25,7 +25,7 @@ import time
import zmq
-from leap.bitmask.backend.api import API, STOP_REQUEST
+from leap.bitmask.backend.api import API, STOP_REQUEST, PING_REQUEST
from leap.bitmask.backend.utils import get_backend_certificates
import logging
@@ -40,6 +40,11 @@ class BackendProxy(object):
PORT = '5556'
SERVER = "tcp://localhost:%s" % PORT
+ POLL_TIMEOUT = 4000 # ms
+ POLL_TRIES = 3
+
+ PING_INTERVAL = 2 # secs
+
def __init__(self):
self._socket = None
@@ -62,6 +67,9 @@ class BackendProxy(object):
socket.connect(self.SERVER)
self._socket = socket
+ self._ping_at = 0
+ self.online = False
+
self._call_queue = Queue.Queue()
self._worker_caller = threading.Thread(target=self._worker)
self._worker_caller.start()
@@ -82,9 +90,26 @@ class BackendProxy(object):
except Queue.Empty:
pass
time.sleep(0.01)
+ self._ping()
logger.debug("BackendProxy worker stopped.")
+ def _reset_ping(self):
+ """
+ Reset the ping timeout counter.
+ This is called for every ping and request.
+ """
+ self._ping_at = time.time() + self.PING_INTERVAL
+
+ def _ping(self):
+ """
+ Heartbeat helper.
+ Sends a PING request just to know that the server is alive.
+ """
+ if time.time() > self._ping_at:
+ self._send_request(PING_REQUEST)
+ self._reset_ping()
+
def _api_call(self, *args, **kwargs):
"""
Call the `api_method` method in backend (through zmq).
@@ -134,16 +159,35 @@ class BackendProxy(object):
# logger.debug("Sending request to backend: {0}".format(request))
self._socket.send(request)
- try:
- # Get the reply.
- self._socket.recv()
- # response = self._socket.recv()
- # msg = "Received reply for '{0}' -> '{1}'"
- # msg = msg.format(request, response)
- # logger.debug(msg)
- except zmq.error.Again as e:
- msg = "Timeout error contacting backend. {0!r}".format(e)
+ poll = zmq.Poller()
+ poll.register(self._socket, zmq.POLLIN)
+
+ reply = None
+ tries = 0
+
+ while True:
+ socks = dict(poll.poll(self.POLL_TIMEOUT))
+ if socks.get(self._socket) == zmq.POLLIN:
+ reply = self._socket.recv()
+ break
+
+ tries += 1
+ if tries < self.POLL_TRIES:
+ logger.warning('Retrying receive... {0}/{1}'.format(
+ tries, self.POLL_TRIES))
+ else:
+ break
+
+ if reply is None:
+ msg = "Timeout error contacting backend."
logger.critical(msg)
+ self.online = False
+ else:
+ # msg = "Received reply for '{0}' -> '{1}'".format(request, reply)
+ # logger.debug(msg)
+ self.online = True
+ # request received, no ping needed for other interval.
+ self._reset_ping()
def __getattribute__(self, name):
"""