summaryrefslogtreecommitdiff
path: root/src/leap/bitmask/backend
diff options
context:
space:
mode:
authorIvan Alejandro <ivanalejandro0@gmail.com>2014-07-21 17:03:47 -0300
committerIvan Alejandro <ivanalejandro0@gmail.com>2014-07-21 17:11:33 -0300
commit68b1be8ef443b088cf5c1f7f964e1bd7ad42408e (patch)
treeec882e6c61f1fe4a04ba9eca113c198f998f084f /src/leap/bitmask/backend
parent159dbe295148975bdfe9a50f871254aa9adf2328 (diff)
Add heartbeat to check if backend is alive.
Send a 'ping' request every 2 secs to ensure that the backend is running. Use polling instead of recv on the backend_proxy. This was already implemented for the signaler.
Diffstat (limited to 'src/leap/bitmask/backend')
-rw-r--r--src/leap/bitmask/backend/api.py2
-rw-r--r--src/leap/bitmask/backend/backend.py6
-rw-r--r--src/leap/bitmask/backend/backend_proxy.py64
3 files changed, 61 insertions, 11 deletions
diff --git a/src/leap/bitmask/backend/api.py b/src/leap/bitmask/backend/api.py
index b8533f36..4f52e470 100644
--- a/src/leap/bitmask/backend/api.py
+++ b/src/leap/bitmask/backend/api.py
@@ -18,10 +18,12 @@
Backend available API and SIGNALS definition.
"""
STOP_REQUEST = "stop"
+PING_REQUEST = "PING"
API = (
STOP_REQUEST, # this method needs to be defined in order to support the
# backend stop action
+ PING_REQUEST,
"eip_can_start",
"eip_cancel_setup",
diff --git a/src/leap/bitmask/backend/backend.py b/src/leap/bitmask/backend/backend.py
index 833f4368..c895f8f5 100644
--- a/src/leap/bitmask/backend/backend.py
+++ b/src/leap/bitmask/backend/backend.py
@@ -23,7 +23,7 @@ from twisted.internet import defer, reactor, threads
import zmq
from zmq.auth.thread import ThreadAuthenticator
-from leap.bitmask.backend.api import API
+from leap.bitmask.backend.api import API, PING_REQUEST
from leap.bitmask.backend.utils import get_backend_certificates
from leap.bitmask.backend.signaler import Signaler
@@ -146,6 +146,10 @@ class Backend(object):
:param request_json: a json specification of a request.
:type request_json: str
"""
+ if request_json == PING_REQUEST:
+ # do not process request if it's just a ping
+ return
+
try:
# request = zmq.utils.jsonapi.loads(request_json)
# We use stdlib's json to ensure that we get unicode strings
diff --git a/src/leap/bitmask/backend/backend_proxy.py b/src/leap/bitmask/backend/backend_proxy.py
index f683e465..dc30d2cb 100644
--- a/src/leap/bitmask/backend/backend_proxy.py
+++ b/src/leap/bitmask/backend/backend_proxy.py
@@ -25,7 +25,7 @@ import time
import zmq
-from leap.bitmask.backend.api import API, STOP_REQUEST
+from leap.bitmask.backend.api import API, STOP_REQUEST, PING_REQUEST
from leap.bitmask.backend.utils import get_backend_certificates
import logging
@@ -40,6 +40,11 @@ class BackendProxy(object):
PORT = '5556'
SERVER = "tcp://localhost:%s" % PORT
+ POLL_TIMEOUT = 4000 # ms
+ POLL_TRIES = 3
+
+ PING_INTERVAL = 2 # secs
+
def __init__(self):
self._socket = None
@@ -62,6 +67,9 @@ class BackendProxy(object):
socket.connect(self.SERVER)
self._socket = socket
+ self._ping_at = 0
+ self.online = False
+
self._call_queue = Queue.Queue()
self._worker_caller = threading.Thread(target=self._worker)
self._worker_caller.start()
@@ -82,9 +90,26 @@ class BackendProxy(object):
except Queue.Empty:
pass
time.sleep(0.01)
+ self._ping()
logger.debug("BackendProxy worker stopped.")
+ def _reset_ping(self):
+ """
+ Reset the ping timeout counter.
+ This is called for every ping and request.
+ """
+ self._ping_at = time.time() + self.PING_INTERVAL
+
+ def _ping(self):
+ """
+ Heartbeat helper.
+ Sends a PING request just to know that the server is alive.
+ """
+ if time.time() > self._ping_at:
+ self._send_request(PING_REQUEST)
+ self._reset_ping()
+
def _api_call(self, *args, **kwargs):
"""
Call the `api_method` method in backend (through zmq).
@@ -134,16 +159,35 @@ class BackendProxy(object):
# logger.debug("Sending request to backend: {0}".format(request))
self._socket.send(request)
- try:
- # Get the reply.
- self._socket.recv()
- # response = self._socket.recv()
- # msg = "Received reply for '{0}' -> '{1}'"
- # msg = msg.format(request, response)
- # logger.debug(msg)
- except zmq.error.Again as e:
- msg = "Timeout error contacting backend. {0!r}".format(e)
+ poll = zmq.Poller()
+ poll.register(self._socket, zmq.POLLIN)
+
+ reply = None
+ tries = 0
+
+ while True:
+ socks = dict(poll.poll(self.POLL_TIMEOUT))
+ if socks.get(self._socket) == zmq.POLLIN:
+ reply = self._socket.recv()
+ break
+
+ tries += 1
+ if tries < self.POLL_TRIES:
+ logger.warning('Retrying receive... {0}/{1}'.format(
+ tries, self.POLL_TRIES))
+ else:
+ break
+
+ if reply is None:
+ msg = "Timeout error contacting backend."
logger.critical(msg)
+ self.online = False
+ else:
+ # msg = "Received reply for '{0}' -> '{1}'".format(request, reply)
+ # logger.debug(msg)
+ self.online = True
+ # request received, no ping needed for other interval.
+ self._reset_ping()
def __getattribute__(self, name):
"""