diff options
author | drebs <drebs@leap.se> | 2015-01-28 16:17:51 -0200 |
---|---|---|
committer | drebs <drebs@leap.se> | 2015-05-08 16:11:47 -0300 |
commit | 01b005b0642454e3d670089ed7e530eda8e9ef91 (patch) | |
tree | d4a99c7e581dbb512d2430db4f0f8d1c90ff310a /src/leap/bitmask/backend/backend.py | |
parent | 456941648223a14fe144264c27a5dce4e4e702e5 (diff) |
[feat] use txzmq in backend
Before this commit, the backend used plain pyzmq bindings for communicating
with the frontend. This implements the txzmq twisted-powered bindings instead.
Closes: #6360
Diffstat (limited to 'src/leap/bitmask/backend/backend.py')
-rw-r--r-- | src/leap/bitmask/backend/backend.py | 128 |
1 files changed, 65 insertions, 63 deletions
diff --git a/src/leap/bitmask/backend/backend.py b/src/leap/bitmask/backend/backend.py index cff731ba..fcbb19dc 100644 --- a/src/leap/bitmask/backend/backend.py +++ b/src/leap/bitmask/backend/backend.py @@ -17,17 +17,16 @@ # FIXME this is missing module documentation. It would be fine to say a couple # of lines about the whole backend architecture. -# TODO use txzmq bindings instead. import json import os -import threading import time import psutil -from twisted.internet import defer, reactor, threads +from twisted.internet import defer, reactor, threads, task +import txzmq import zmq try: from zmq.auth.thread import ThreadAuthenticator @@ -43,43 +42,42 @@ import logging logger = logging.getLogger(__name__) -class Backend(object): +class TxZmqREPConnection(object): """ - Backend server. - Receives signals from backend_proxy and emit signals if needed. + A twisted based zmq rep connection. """ - # XXX we might want to make this configurable per-platform, - # and use the most performant socket type on each one. - if flags.ZMQ_HAS_CURVE: - # XXX this should not be hardcoded. Make it configurable. - PORT = '5556' - BIND_ADDR = "tcp://127.0.0.1:%s" % PORT - else: - SOCKET_FILE = "/tmp/bitmask.socket.0" - BIND_ADDR = "ipc://%s" % SOCKET_FILE - PING_INTERVAL = 2 # secs - - def __init__(self, frontend_pid=None): + def __init__(self, server_address, process_request): """ - Backend constructor, create needed instances. + Initialize the connection. + + :param server_address: The address of the backend zmq server. + :type server: str + :param process_request: A callable used to process incoming requests. + :type process_request: callable(messageParts) """ - self._signaler = Signaler() + self._server_address = server_address + self._process_request = process_request + self._zmq_factory = None + self._zmq_connection = None + self._init_txzmq() - self._frontend_pid = frontend_pid + def _init_txzmq(self): + """ + Configure the txzmq components and connection. + """ + self._zmq_factory = txzmq.ZmqFactory() + self._zmq_factory.registerForShutdown() + self._zmq_connection = txzmq.ZmqREPConnection(self._zmq_factory) - self._do_work = threading.Event() # used to stop the worker thread. - self._zmq_socket = None + context = self._zmq_factory.context + socket = self._zmq_connection.socket - self._ongoing_defers = [] - self._init_zmq() + def _gotMessage(messageId, messageParts): + self._zmq_connection.reply(messageId, "OK") + self._process_request(messageParts) - def _init_zmq(self): - """ - Configure the zmq components and connection. - """ - context = zmq.Context() - socket = context.socket(zmq.REP) + self._zmq_connection.gotMessage = _gotMessage if flags.ZMQ_HAS_CURVE: # Start an authenticator for this context. @@ -95,37 +93,39 @@ class Backend(object): socket.curve_secretkey = secret socket.curve_server = True # must come before bind - socket.bind(self.BIND_ADDR) - if not flags.ZMQ_HAS_CURVE: - os.chmod(self.SOCKET_FILE, 0600) + proto, addr = self._server_address.split('://') # tcp/ipc, ip/socket + socket.bind(self._server_address) + if proto == 'ipc': + os.chmod(addr, 0600) - self._zmq_socket = socket - def _worker(self): - """ - Receive requests and send it to process. +class Backend(object): + """ + Backend server. + Receives signals from backend_proxy and emit signals if needed. + """ + # XXX we might want to make this configurable per-platform, + # and use the most performant socket type on each one. + if flags.ZMQ_HAS_CURVE: + # XXX this should not be hardcoded. Make it configurable. + PORT = '5556' + BIND_ADDR = "tcp://127.0.0.1:%s" % PORT + else: + SOCKET_FILE = "/tmp/bitmask.socket.0" + BIND_ADDR = "ipc://%s" % SOCKET_FILE - Note: we use a simple while since is less resource consuming than a - Twisted's LoopingCall. + PING_INTERVAL = 2 # secs + + def __init__(self, frontend_pid=None): """ - pid = self._frontend_pid - check_wait = 0 - while self._do_work.is_set(): - # Wait for next request from client - try: - request = self._zmq_socket.recv(zmq.NOBLOCK) - self._zmq_socket.send("OK") - # logger.debug("Received request: '{0}'".format(request)) - self._process_request(request) - except zmq.ZMQError as e: - if e.errno != zmq.EAGAIN: - raise - time.sleep(0.01) - - check_wait += 0.01 - if pid is not None and check_wait > self.PING_INTERVAL: - check_wait = 0 - self._check_frontend_alive() + Backend constructor, create needed instances. + """ + self._signaler = Signaler() + self._frontend_pid = frontend_pid + self._frontend_checker = None + self._ongoing_defers = [] + self._zmq_connection = TxZmqREPConnection( + self.BIND_ADDR, self._process_request) def _check_frontend_alive(self): """ @@ -160,25 +160,27 @@ class Backend(object): for d in self._ongoing_defers: d.cancel() + logger.debug("Stopping the Twisted reactor...") reactor.stop() - logger.debug("Twisted reactor stopped.") def run(self): """ Start the ZMQ server and run the loop to handle requests. """ self._signaler.start() - self._do_work.set() - threads.deferToThread(self._worker) + self._frontend_checker = task.LoopingCall(self._check_frontend_alive) + self._frontend_checker.start(self.PING_INTERVAL) + logger.debug("Starting Twisted reactor.") reactor.run() + logger.debug("Finished Twisted reactor.") def stop(self): """ Stop the server and the zmq request parse loop. """ - logger.debug("STOP received.") + logger.debug("Stopping the backend...") self._signaler.stop() - self._do_work.clear() + self._frontend_checker.stop() threads.deferToThread(self._stop_reactor) def _process_request(self, request_json): |