From 1eae3af1906f95d9d2d76c205b61799d248f0e71 Mon Sep 17 00:00:00 2001 From: Ivan Alejandro Date: Tue, 8 Jul 2014 16:45:43 -0300 Subject: Add base communication framework. --- src/leap/bitmask/backend/backend.py | 196 ++++++++++++++++++++++++++++++ src/leap/bitmask/backend/backend_proxy.py | 142 ++++++++++++++++++++++ src/leap/bitmask/backend/signaler.py | 140 +++++++++++++++++++++ src/leap/bitmask/backend/signaler_qt.py | 105 ++++++++++++++++ src/leap/bitmask/backend/utils.py | 43 +++++++ 5 files changed, 626 insertions(+) create mode 100644 src/leap/bitmask/backend/backend.py create mode 100644 src/leap/bitmask/backend/backend_proxy.py create mode 100644 src/leap/bitmask/backend/signaler.py create mode 100644 src/leap/bitmask/backend/signaler_qt.py create mode 100644 src/leap/bitmask/backend/utils.py (limited to 'src/leap') diff --git a/src/leap/bitmask/backend/backend.py b/src/leap/bitmask/backend/backend.py new file mode 100644 index 00000000..26c547b6 --- /dev/null +++ b/src/leap/bitmask/backend/backend.py @@ -0,0 +1,196 @@ +#!/usr/bin/env python +# encoding: utf-8 +import json +import threading +import time + +from twisted.internet import defer, reactor, threads + +import zmq +from zmq.auth.thread import ThreadAuthenticator + +from leap.bitmask.backend.api import API +from leap.bitmask.backend.utils import get_backend_certificates +from leap.bitmask.backend.signaler import Signaler + +import logging +logger = logging.getLogger(__name__) + + +class Backend(object): + """ + Backend server. + Receives signals from backend_proxy and emit signals if needed. + """ + PORT = '5556' + BIND_ADDR = "tcp://127.0.0.1:%s" % PORT + + def __init__(self): + """ + Backend constructor, create needed instances. + """ + self._signaler = Signaler() + + self._do_work = threading.Event() # used to stop the worker thread. + self._zmq_socket = None + + self._ongoing_defers = [] + self._init_zmq() + + def _init_zmq(self): + """ + Configure the zmq components and connection. + """ + context = zmq.Context() + socket = context.socket(zmq.REP) + + # Start an authenticator for this context. + auth = ThreadAuthenticator(context) + auth.start() + auth.allow('127.0.0.1') + + # Tell authenticator to use the certificate in a directory + auth.configure_curve(domain='*', location=zmq.auth.CURVE_ALLOW_ANY) + public, secret = get_backend_certificates() + socket.curve_publickey = public + socket.curve_secretkey = secret + socket.curve_server = True # must come before bind + + socket.bind(self.BIND_ADDR) + + self._zmq_socket = socket + + def _worker(self): + """ + Receive requests and send it to process. + + Note: we use a simple while since is less resource consuming than a + Twisted's LoopingCall. + """ + while self._do_work.is_set(): + # Wait for next request from client + try: + request = self._zmq_socket.recv(zmq.NOBLOCK) + self._zmq_socket.send("OK") + logger.debug("Received request: '{0}'".format(request)) + self._process_request(request) + except zmq.ZMQError as e: + if e.errno != zmq.EAGAIN: + raise + time.sleep(0.01) + + def _stop_reactor(self): + """ + Stop the Twisted reactor, but first wait a little for some threads to + complete their work. + + Note: this method needs to be run in a different thread so the + time.sleep() does not block and other threads can finish. + i.e.: + use threads.deferToThread(this_method) instead of this_method() + """ + wait_max = 5 # seconds + wait_step = 0.5 + wait = 0 + while self._ongoing_defers and wait < wait_max: + time.sleep(wait_step) + wait += wait_step + msg = "Waiting for running threads to finish... {0}/{1}" + msg = msg.format(wait, wait_max) + logger.debug(msg) + + # after a timeout we shut down the existing threads. + for d in self._ongoing_defers: + d.cancel() + + reactor.stop() + logger.debug("Twisted reactor stopped.") + + def run(self): + """ + Start the ZMQ server and run the loop to handle requests. + """ + self._signaler.start() + self._do_work.set() + threads.deferToThread(self._worker) + reactor.run() + + def stop(self): + """ + Stop the server and the zmq request parse loop. + """ + logger.debug("STOP received.") + self._signaler.stop() + self._do_work.clear() + threads.deferToThread(self._stop_reactor) + + def _process_request(self, request_json): + """ + Process a request and call the according method with the given + parameters. + + :param request_json: a json specification of a request. + :type request_json: str + """ + try: + # request = zmq.utils.jsonapi.loads(request_json) + # We use stdlib's json to ensure that we get unicode strings + request = json.loads(request_json) + api_method = request['api_method'] + kwargs = request['arguments'] or None + except Exception as e: + msg = "Malformed JSON data in Backend request '{0}'. Exc: {1!r}" + msg = msg.format(request_json, e) + msg = msg.format(request_json) + logger.critical(msg) + raise + + if api_method not in API: + logger.error("Invalid API call '{0}'".format(api_method)) + return + + self._run_in_thread(api_method, kwargs) + + def _run_in_thread(self, api_method, kwargs): + """ + Run the method name in a thread with the given arguments. + + :param api_method: the callable name to run in a thread. + :type api_method: str + :param kwargs: the arguments dict that will be sent to the callable. + :type kwargs: tuple + """ + func = getattr(self, api_method) + + method = func + if kwargs is not None: + method = lambda: func(**kwargs) + + logger.debug("Running method: '{0}' " + "with args: '{1}' in a thread".format(api_method, kwargs)) + + # run the action in a thread and keep track of it + d = threads.deferToThread(method) + d.addCallback(self._done_action, d) + d.addErrback(self._done_action, d) + self._ongoing_defers.append(d) + + def _done_action(self, failure, d): + """ + Remove the defer from the ongoing list. + + :param failure: the failure that triggered the errback. + None if no error. + :type failure: twisted.python.failure.Failure + :param d: defer to remove + :type d: twisted.internet.defer.Deferred + """ + if failure is not None: + if failure.check(defer.CancelledError): + logger.debug("A defer was cancelled.") + else: + logger.error("There was a failure - {0!r}".format(failure)) + logger.error(failure.getTraceback()) + + if d in self._ongoing_defers: + self._ongoing_defers.remove(d) diff --git a/src/leap/bitmask/backend/backend_proxy.py b/src/leap/bitmask/backend/backend_proxy.py new file mode 100644 index 00000000..ae9cf5b1 --- /dev/null +++ b/src/leap/bitmask/backend/backend_proxy.py @@ -0,0 +1,142 @@ +#!/usr/bin/env python +# encoding: utf-8 +import functools +import Queue +import threading +import time + +import zmq + +from leap.bitmask.backend.api import API, STOP_REQUEST +from leap.bitmask.backend.utils import get_backend_certificates + +import logging +logger = logging.getLogger(__name__) + + +class BackendProxy(object): + """ + The BackendProxy handles calls from the GUI and forwards (through ZMQ) + to the backend. + """ + PORT = '5556' + SERVER = "tcp://localhost:%s" % PORT + + def __init__(self): + self._socket = None + + # initialize ZMQ stuff: + context = zmq.Context() + logger.debug("Connecting to server...") + socket = context.socket(zmq.REQ) + + # public, secret = zmq.curve_keypair() + client_keys = zmq.curve_keypair() + socket.curve_publickey = client_keys[0] + socket.curve_secretkey = client_keys[1] + + # The client must know the server's public key to make a CURVE + # connection. + public, _ = get_backend_certificates() + socket.curve_serverkey = public + + socket.setsockopt(zmq.RCVTIMEO, 1000) + socket.connect(self.SERVER) + self._socket = socket + + self._call_queue = Queue.Queue() + self._worker_caller = threading.Thread(target=self._worker) + self._worker_caller.start() + + def _worker(self): + """ + Worker loop that processes the Queue of pending requests to do. + """ + while True: + try: + request = self._call_queue.get(block=False) + # break the loop after sending the 'stop' action to the + # backend. + if request == STOP_REQUEST: + break + + self._send_request(request) + except Queue.Empty: + pass + time.sleep(0.01) + + logger.debug("BackendProxy worker stopped.") + + def _api_call(self, *args, **kwargs): + """ + Call the `api_method` method in backend (through zmq). + + :param kwargs: named arguments to forward to the backend api method. + :type kwargs: dict + + Note: is mandatory to have the kwarg 'api_method' defined. + """ + if args: + # Use a custom message to be more clear about using kwargs *only* + raise Exception("All arguments need to be kwargs!") + + api_method = kwargs.pop('api_method', None) + if api_method is None: + raise Exception("Missing argument, no method name specified.") + + request = { + 'api_method': api_method, + 'arguments': kwargs, + } + + try: + request_json = zmq.utils.jsonapi.dumps(request) + except Exception as e: + msg = ("Error serializing request into JSON.\n" + "Exception: {0} Data: {1}") + msg = msg.format(e, request) + logger.critical(msg) + raise + + # queue the call in order to handle the request in a thread safe way. + self._call_queue.put(request_json) + + if api_method == STOP_REQUEST: + self._call_queue.put(STOP_REQUEST) + + def _send_request(self, request): + """ + Send the given request to the server. + This is used from a thread safe loop in order to avoid sending a + request without receiving a response from a previous one. + + :param request: the request to send. + :type request: str + """ + logger.debug("Sending request to backend: {0}".format(request)) + self._socket.send(request) + + try: + # Get the reply. + response = self._socket.recv() + msg = "Received reply for '{0}' -> '{1}'".format(request, response) + logger.debug(msg) + except zmq.error.Again as e: + msg = "Timeout error contacting backend. {0!r}".format(e) + logger.critical(msg) + + def __getattribute__(self, name): + """ + This allows the user to do: + bp = BackendProxy() + bp.some_method() + + Just by having defined 'some_method' in the API + + :param name: the attribute name that is requested. + :type name: str + """ + if name in API: + return functools.partial(self._api_call, api_method=name) + else: + return object.__getattribute__(self, name) diff --git a/src/leap/bitmask/backend/signaler.py b/src/leap/bitmask/backend/signaler.py new file mode 100644 index 00000000..d96015f4 --- /dev/null +++ b/src/leap/bitmask/backend/signaler.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python +# encoding: utf-8 +import Queue +import threading +import time + +import zmq + +from leap.bitmask.backend.api import SIGNALS +from leap.bitmask.backend.utils import get_frontend_certificates + +import logging +logger = logging.getLogger(__name__) + + +class Signaler(object): + """ + Signaler client. + Receives signals from the backend and sends to the signaling server. + """ + PORT = "5667" + SERVER = "tcp://localhost:%s" % PORT + + def __init__(self): + """ + Initialize the ZMQ socket to talk to the signaling server. + """ + context = zmq.Context() + logger.debug("Connecting to signaling server...") + socket = context.socket(zmq.REQ) + + # public, secret = zmq.curve_keypair() + client_keys = zmq.curve_keypair() + socket.curve_publickey = client_keys[0] + socket.curve_secretkey = client_keys[1] + + # The client must know the server's public key to make a CURVE + # connection. + public, _ = get_frontend_certificates() + socket.curve_serverkey = public + + socket.setsockopt(zmq.RCVTIMEO, 1000) + socket.connect(self.SERVER) + self._socket = socket + + self._signal_queue = Queue.Queue() + + self._do_work = threading.Event() # used to stop the worker thread. + self._worker_signaler = threading.Thread(target=self._worker) + + def __getattribute__(self, name): + """ + This allows the user to do: + S = Signaler() + S.SOME_SIGNAL + + Just by having defined 'some_signal' in _SIGNALS + + :param name: the attribute name that is requested. + :type name: str + """ + if name in SIGNALS: + return name + else: + return object.__getattribute__(self, name) + + def signal(self, signal, data=None): + """ + Sends a signal to the signaling server. + + :param signal: the signal to send. + :type signal: str + """ + if signal not in SIGNALS: + raise Exception("Unknown signal: '{0}'".format(signal)) + + request = { + 'signal': signal, + 'data': data, + } + + try: + request_json = zmq.utils.jsonapi.dumps(request) + except Exception as e: + msg = ("Error serializing request into JSON.\n" + "Exception: {0} Data: {1}") + msg = msg.format(e, request) + logger.critical(msg) + raise + + # queue the call in order to handle the request in a thread safe way. + self._signal_queue.put(request_json) + + def _worker(self): + """ + Worker loop that processes the Queue of pending requests to do. + """ + while self._do_work.is_set(): + try: + request = self._signal_queue.get(block=False) + self._send_request(request) + except Queue.Empty: + pass + time.sleep(0.01) + + logger.debug("Signaler thread stopped.") + + def start(self): + """ + Start the Signaler worker. + """ + self._do_work.set() + self._worker_signaler.start() + + def stop(self): + """ + Stop the Signaler worker. + """ + self._do_work.clear() + + def _send_request(self, request): + """ + Send the given request to the server. + This is used from a thread safe loop in order to avoid sending a + request without receiving a response from a previous one. + + :param request: the request to send. + :type request: str + """ + logger.debug("Signaling '{0}'".format(request)) + self._socket.send(request) + + # Get the reply. + try: + response = self._socket.recv() + msg = "Received reply for '{0}' -> '{1}'".format(request, response) + logger.debug(msg) + except zmq.error.Again as e: + msg = "Timeout error contacting signaler. {0!r}".format(e) + logger.critical(msg) diff --git a/src/leap/bitmask/backend/signaler_qt.py b/src/leap/bitmask/backend/signaler_qt.py new file mode 100644 index 00000000..cf49df91 --- /dev/null +++ b/src/leap/bitmask/backend/signaler_qt.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python +# encoding: utf-8 +import threading +import time + +from PySide import QtCore + +import zmq +from zmq.auth.thread import ThreadAuthenticator + +from leap.bitmask.backend.api import SIGNALS +from leap.bitmask.backend.utils import get_frontend_certificates + +import logging +logger = logging.getLogger(__name__) + + +class SignalerQt(QtCore.QThread): + """ + Signaling server. + Receives signals from the signaling client and emit Qt signals for the GUI. + """ + PORT = "5667" + BIND_ADDR = "tcp://127.0.0.1:%s" % PORT + + def __init__(self): + QtCore.QThread.__init__(self) + self._do_work = threading.Event() + self._do_work.set() + + def run(self): + """ + Start a loop to process the ZMQ requests from the signaler client. + """ + logger.debug("Running SignalerQt loop") + context = zmq.Context() + socket = context.socket(zmq.REP) + + # Start an authenticator for this context. + auth = ThreadAuthenticator(context) + auth.start() + auth.allow('127.0.0.1') + + # Tell authenticator to use the certificate in a directory + auth.configure_curve(domain='*', location=zmq.auth.CURVE_ALLOW_ANY) + public, secret = get_frontend_certificates() + socket.curve_publickey = public + socket.curve_secretkey = secret + socket.curve_server = True # must come before bind + + socket.bind(self.BIND_ADDR) + + while self._do_work.is_set(): + # Wait for next request from client + try: + request = socket.recv(zmq.NOBLOCK) + logger.debug("Received request: '{0}'".format(request)) + socket.send("OK") + self._process_request(request) + except zmq.ZMQError as e: + if e.errno != zmq.EAGAIN: + raise + time.sleep(0.01) + + logger.debug("SignalerQt thread stopped.") + + def stop(self): + """ + Stop the SignalerQt blocking loop. + """ + self._do_work.clear() + + def _process_request(self, request_json): + """ + Process a request and call the according method with the given + parameters. + + :param request_json: a json specification of a request. + :type request_json: str + """ + try: + request = zmq.utils.jsonapi.loads(request_json) + signal = request['signal'] + data = request['data'] + except Exception as e: + msg = "Malformed JSON data in Signaler request '{0}'. Exc: {1!r}" + msg = msg.format(request_json, e) + logger.critical(msg) + raise + + if signal not in SIGNALS: + logger.error("Unknown signal received, '{0}'".format(signal)) + return + + try: + qt_signal = getattr(self, signal) + except Exception: + logger.warning("Signal not implemented, '{0}'".format(signal)) + return + + logger.debug("Emitting '{0}'".format(signal)) + if data is None: + qt_signal.emit() + else: + qt_signal.emit(data) diff --git a/src/leap/bitmask/backend/utils.py b/src/leap/bitmask/backend/utils.py new file mode 100644 index 00000000..5fe59a62 --- /dev/null +++ b/src/leap/bitmask/backend/utils.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python +# encoding: utf-8 +import os +import shutil + +import zmq.auth + +from leap.bitmask.util import get_path_prefix + +KEYS_DIR = os.path.join(get_path_prefix(), 'leap', 'zmq_certificates') + + +def generate_certificates(): + """ + Generate client and server CURVE certificate files. + """ + # Create directory for certificates, remove old content if necessary + if os.path.exists(KEYS_DIR): + shutil.rmtree(KEYS_DIR) + os.mkdir(KEYS_DIR) + + # create new keys in certificates dir + # public_file, secret_file = create_certificates(...) + zmq.auth.create_certificates(KEYS_DIR, "frontend") + zmq.auth.create_certificates(KEYS_DIR, "backend") + + +def get_frontend_certificates(): + """ + Return the frontend's public and secret certificates. + """ + frontend_secret_file = os.path.join(KEYS_DIR, "frontend.key_secret") + public, secret = zmq.auth.load_certificate(frontend_secret_file) + return public, secret + + +def get_backend_certificates(base_dir='.'): + """ + Return the backend's public and secret certificates. + """ + backend_secret_file = os.path.join(KEYS_DIR, "backend.key_secret") + public, secret = zmq.auth.load_certificate(backend_secret_file) + return public, secret -- cgit v1.2.3