summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Alejandro <ivanalejandro0@gmail.com>2014-07-08 16:45:43 -0300
committerIvan Alejandro <ivanalejandro0@gmail.com>2014-07-14 12:11:50 -0300
commit1eae3af1906f95d9d2d76c205b61799d248f0e71 (patch)
tree9fc8f3d0a6b01963068878582e9520933544023a
parent3795dedd26fc239e143ca2a29b7e16d433f964ba (diff)
Add base communication framework.
-rw-r--r--src/leap/bitmask/backend/backend.py196
-rw-r--r--src/leap/bitmask/backend/backend_proxy.py142
-rw-r--r--src/leap/bitmask/backend/signaler.py140
-rw-r--r--src/leap/bitmask/backend/signaler_qt.py105
-rw-r--r--src/leap/bitmask/backend/utils.py43
5 files changed, 626 insertions, 0 deletions
diff --git a/src/leap/bitmask/backend/backend.py b/src/leap/bitmask/backend/backend.py
new file mode 100644
index 00000000..26c547b6
--- /dev/null
+++ b/src/leap/bitmask/backend/backend.py
@@ -0,0 +1,196 @@
+#!/usr/bin/env python
+# encoding: utf-8
+import json
+import threading
+import time
+
+from twisted.internet import defer, reactor, threads
+
+import zmq
+from zmq.auth.thread import ThreadAuthenticator
+
+from leap.bitmask.backend.api import API
+from leap.bitmask.backend.utils import get_backend_certificates
+from leap.bitmask.backend.signaler import Signaler
+
+import logging
+logger = logging.getLogger(__name__)
+
+
+class Backend(object):
+ """
+ Backend server.
+ Receives signals from backend_proxy and emit signals if needed.
+ """
+ PORT = '5556'
+ BIND_ADDR = "tcp://127.0.0.1:%s" % PORT
+
+ def __init__(self):
+ """
+ Backend constructor, create needed instances.
+ """
+ self._signaler = Signaler()
+
+ self._do_work = threading.Event() # used to stop the worker thread.
+ self._zmq_socket = None
+
+ self._ongoing_defers = []
+ self._init_zmq()
+
+ def _init_zmq(self):
+ """
+ Configure the zmq components and connection.
+ """
+ context = zmq.Context()
+ socket = context.socket(zmq.REP)
+
+ # Start an authenticator for this context.
+ auth = ThreadAuthenticator(context)
+ auth.start()
+ auth.allow('127.0.0.1')
+
+ # Tell authenticator to use the certificate in a directory
+ auth.configure_curve(domain='*', location=zmq.auth.CURVE_ALLOW_ANY)
+ public, secret = get_backend_certificates()
+ socket.curve_publickey = public
+ socket.curve_secretkey = secret
+ socket.curve_server = True # must come before bind
+
+ socket.bind(self.BIND_ADDR)
+
+ self._zmq_socket = socket
+
+ def _worker(self):
+ """
+ Receive requests and send it to process.
+
+ Note: we use a simple while since is less resource consuming than a
+ Twisted's LoopingCall.
+ """
+ while self._do_work.is_set():
+ # Wait for next request from client
+ try:
+ request = self._zmq_socket.recv(zmq.NOBLOCK)
+ self._zmq_socket.send("OK")
+ logger.debug("Received request: '{0}'".format(request))
+ self._process_request(request)
+ except zmq.ZMQError as e:
+ if e.errno != zmq.EAGAIN:
+ raise
+ time.sleep(0.01)
+
+ def _stop_reactor(self):
+ """
+ Stop the Twisted reactor, but first wait a little for some threads to
+ complete their work.
+
+ Note: this method needs to be run in a different thread so the
+ time.sleep() does not block and other threads can finish.
+ i.e.:
+ use threads.deferToThread(this_method) instead of this_method()
+ """
+ wait_max = 5 # seconds
+ wait_step = 0.5
+ wait = 0
+ while self._ongoing_defers and wait < wait_max:
+ time.sleep(wait_step)
+ wait += wait_step
+ msg = "Waiting for running threads to finish... {0}/{1}"
+ msg = msg.format(wait, wait_max)
+ logger.debug(msg)
+
+ # after a timeout we shut down the existing threads.
+ for d in self._ongoing_defers:
+ d.cancel()
+
+ reactor.stop()
+ logger.debug("Twisted reactor stopped.")
+
+ def run(self):
+ """
+ Start the ZMQ server and run the loop to handle requests.
+ """
+ self._signaler.start()
+ self._do_work.set()
+ threads.deferToThread(self._worker)
+ reactor.run()
+
+ def stop(self):
+ """
+ Stop the server and the zmq request parse loop.
+ """
+ logger.debug("STOP received.")
+ self._signaler.stop()
+ self._do_work.clear()
+ threads.deferToThread(self._stop_reactor)
+
+ def _process_request(self, request_json):
+ """
+ Process a request and call the according method with the given
+ parameters.
+
+ :param request_json: a json specification of a request.
+ :type request_json: str
+ """
+ try:
+ # request = zmq.utils.jsonapi.loads(request_json)
+ # We use stdlib's json to ensure that we get unicode strings
+ request = json.loads(request_json)
+ api_method = request['api_method']
+ kwargs = request['arguments'] or None
+ except Exception as e:
+ msg = "Malformed JSON data in Backend request '{0}'. Exc: {1!r}"
+ msg = msg.format(request_json, e)
+ msg = msg.format(request_json)
+ logger.critical(msg)
+ raise
+
+ if api_method not in API:
+ logger.error("Invalid API call '{0}'".format(api_method))
+ return
+
+ self._run_in_thread(api_method, kwargs)
+
+ def _run_in_thread(self, api_method, kwargs):
+ """
+ Run the method name in a thread with the given arguments.
+
+ :param api_method: the callable name to run in a thread.
+ :type api_method: str
+ :param kwargs: the arguments dict that will be sent to the callable.
+ :type kwargs: tuple
+ """
+ func = getattr(self, api_method)
+
+ method = func
+ if kwargs is not None:
+ method = lambda: func(**kwargs)
+
+ logger.debug("Running method: '{0}' "
+ "with args: '{1}' in a thread".format(api_method, kwargs))
+
+ # run the action in a thread and keep track of it
+ d = threads.deferToThread(method)
+ d.addCallback(self._done_action, d)
+ d.addErrback(self._done_action, d)
+ self._ongoing_defers.append(d)
+
+ def _done_action(self, failure, d):
+ """
+ Remove the defer from the ongoing list.
+
+ :param failure: the failure that triggered the errback.
+ None if no error.
+ :type failure: twisted.python.failure.Failure
+ :param d: defer to remove
+ :type d: twisted.internet.defer.Deferred
+ """
+ if failure is not None:
+ if failure.check(defer.CancelledError):
+ logger.debug("A defer was cancelled.")
+ else:
+ logger.error("There was a failure - {0!r}".format(failure))
+ logger.error(failure.getTraceback())
+
+ if d in self._ongoing_defers:
+ self._ongoing_defers.remove(d)
diff --git a/src/leap/bitmask/backend/backend_proxy.py b/src/leap/bitmask/backend/backend_proxy.py
new file mode 100644
index 00000000..ae9cf5b1
--- /dev/null
+++ b/src/leap/bitmask/backend/backend_proxy.py
@@ -0,0 +1,142 @@
+#!/usr/bin/env python
+# encoding: utf-8
+import functools
+import Queue
+import threading
+import time
+
+import zmq
+
+from leap.bitmask.backend.api import API, STOP_REQUEST
+from leap.bitmask.backend.utils import get_backend_certificates
+
+import logging
+logger = logging.getLogger(__name__)
+
+
+class BackendProxy(object):
+ """
+ The BackendProxy handles calls from the GUI and forwards (through ZMQ)
+ to the backend.
+ """
+ PORT = '5556'
+ SERVER = "tcp://localhost:%s" % PORT
+
+ def __init__(self):
+ self._socket = None
+
+ # initialize ZMQ stuff:
+ context = zmq.Context()
+ logger.debug("Connecting to server...")
+ socket = context.socket(zmq.REQ)
+
+ # public, secret = zmq.curve_keypair()
+ client_keys = zmq.curve_keypair()
+ socket.curve_publickey = client_keys[0]
+ socket.curve_secretkey = client_keys[1]
+
+ # The client must know the server's public key to make a CURVE
+ # connection.
+ public, _ = get_backend_certificates()
+ socket.curve_serverkey = public
+
+ socket.setsockopt(zmq.RCVTIMEO, 1000)
+ socket.connect(self.SERVER)
+ self._socket = socket
+
+ self._call_queue = Queue.Queue()
+ self._worker_caller = threading.Thread(target=self._worker)
+ self._worker_caller.start()
+
+ def _worker(self):
+ """
+ Worker loop that processes the Queue of pending requests to do.
+ """
+ while True:
+ try:
+ request = self._call_queue.get(block=False)
+ # break the loop after sending the 'stop' action to the
+ # backend.
+ if request == STOP_REQUEST:
+ break
+
+ self._send_request(request)
+ except Queue.Empty:
+ pass
+ time.sleep(0.01)
+
+ logger.debug("BackendProxy worker stopped.")
+
+ def _api_call(self, *args, **kwargs):
+ """
+ Call the `api_method` method in backend (through zmq).
+
+ :param kwargs: named arguments to forward to the backend api method.
+ :type kwargs: dict
+
+ Note: is mandatory to have the kwarg 'api_method' defined.
+ """
+ if args:
+ # Use a custom message to be more clear about using kwargs *only*
+ raise Exception("All arguments need to be kwargs!")
+
+ api_method = kwargs.pop('api_method', None)
+ if api_method is None:
+ raise Exception("Missing argument, no method name specified.")
+
+ request = {
+ 'api_method': api_method,
+ 'arguments': kwargs,
+ }
+
+ try:
+ request_json = zmq.utils.jsonapi.dumps(request)
+ except Exception as e:
+ msg = ("Error serializing request into JSON.\n"
+ "Exception: {0} Data: {1}")
+ msg = msg.format(e, request)
+ logger.critical(msg)
+ raise
+
+ # queue the call in order to handle the request in a thread safe way.
+ self._call_queue.put(request_json)
+
+ if api_method == STOP_REQUEST:
+ self._call_queue.put(STOP_REQUEST)
+
+ def _send_request(self, request):
+ """
+ Send the given request to the server.
+ This is used from a thread safe loop in order to avoid sending a
+ request without receiving a response from a previous one.
+
+ :param request: the request to send.
+ :type request: str
+ """
+ logger.debug("Sending request to backend: {0}".format(request))
+ self._socket.send(request)
+
+ try:
+ # Get the reply.
+ response = self._socket.recv()
+ msg = "Received reply for '{0}' -> '{1}'".format(request, response)
+ logger.debug(msg)
+ except zmq.error.Again as e:
+ msg = "Timeout error contacting backend. {0!r}".format(e)
+ logger.critical(msg)
+
+ def __getattribute__(self, name):
+ """
+ This allows the user to do:
+ bp = BackendProxy()
+ bp.some_method()
+
+ Just by having defined 'some_method' in the API
+
+ :param name: the attribute name that is requested.
+ :type name: str
+ """
+ if name in API:
+ return functools.partial(self._api_call, api_method=name)
+ else:
+ return object.__getattribute__(self, name)
diff --git a/src/leap/bitmask/backend/signaler.py b/src/leap/bitmask/backend/signaler.py
new file mode 100644
index 00000000..d96015f4
--- /dev/null
+++ b/src/leap/bitmask/backend/signaler.py
@@ -0,0 +1,140 @@
+#!/usr/bin/env python
+# encoding: utf-8
+import Queue
+import threading
+import time
+
+import zmq
+
+from leap.bitmask.backend.api import SIGNALS
+from leap.bitmask.backend.utils import get_frontend_certificates
+
+import logging
+logger = logging.getLogger(__name__)
+
+
+class Signaler(object):
+ """
+ Signaler client.
+ Receives signals from the backend and sends to the signaling server.
+ """
+ PORT = "5667"
+ SERVER = "tcp://localhost:%s" % PORT
+
+ def __init__(self):
+ """
+ Initialize the ZMQ socket to talk to the signaling server.
+ """
+ context = zmq.Context()
+ logger.debug("Connecting to signaling server...")
+ socket = context.socket(zmq.REQ)
+
+ # public, secret = zmq.curve_keypair()
+ client_keys = zmq.curve_keypair()
+ socket.curve_publickey = client_keys[0]
+ socket.curve_secretkey = client_keys[1]
+
+ # The client must know the server's public key to make a CURVE
+ # connection.
+ public, _ = get_frontend_certificates()
+ socket.curve_serverkey = public
+
+ socket.setsockopt(zmq.RCVTIMEO, 1000)
+ socket.connect(self.SERVER)
+ self._socket = socket
+
+ self._signal_queue = Queue.Queue()
+
+ self._do_work = threading.Event() # used to stop the worker thread.
+ self._worker_signaler = threading.Thread(target=self._worker)
+
+ def __getattribute__(self, name):
+ """
+ This allows the user to do:
+ S = Signaler()
+ S.SOME_SIGNAL
+
+ Just by having defined 'some_signal' in _SIGNALS
+
+ :param name: the attribute name that is requested.
+ :type name: str
+ """
+ if name in SIGNALS:
+ return name
+ else:
+ return object.__getattribute__(self, name)
+
+ def signal(self, signal, data=None):
+ """
+ Sends a signal to the signaling server.
+
+ :param signal: the signal to send.
+ :type signal: str
+ """
+ if signal not in SIGNALS:
+ raise Exception("Unknown signal: '{0}'".format(signal))
+
+ request = {
+ 'signal': signal,
+ 'data': data,
+ }
+
+ try:
+ request_json = zmq.utils.jsonapi.dumps(request)
+ except Exception as e:
+ msg = ("Error serializing request into JSON.\n"
+ "Exception: {0} Data: {1}")
+ msg = msg.format(e, request)
+ logger.critical(msg)
+ raise
+
+ # queue the call in order to handle the request in a thread safe way.
+ self._signal_queue.put(request_json)
+
+ def _worker(self):
+ """
+ Worker loop that processes the Queue of pending requests to do.
+ """
+ while self._do_work.is_set():
+ try:
+ request = self._signal_queue.get(block=False)
+ self._send_request(request)
+ except Queue.Empty:
+ pass
+ time.sleep(0.01)
+
+ logger.debug("Signaler thread stopped.")
+
+ def start(self):
+ """
+ Start the Signaler worker.
+ """
+ self._do_work.set()
+ self._worker_signaler.start()
+
+ def stop(self):
+ """
+ Stop the Signaler worker.
+ """
+ self._do_work.clear()
+
+ def _send_request(self, request):
+ """
+ Send the given request to the server.
+ This is used from a thread safe loop in order to avoid sending a
+ request without receiving a response from a previous one.
+
+ :param request: the request to send.
+ :type request: str
+ """
+ logger.debug("Signaling '{0}'".format(request))
+ self._socket.send(request)
+
+ # Get the reply.
+ try:
+ response = self._socket.recv()
+ msg = "Received reply for '{0}' -> '{1}'".format(request, response)
+ logger.debug(msg)
+ except zmq.error.Again as e:
+ msg = "Timeout error contacting signaler. {0!r}".format(e)
+ logger.critical(msg)
diff --git a/src/leap/bitmask/backend/signaler_qt.py b/src/leap/bitmask/backend/signaler_qt.py
new file mode 100644
index 00000000..cf49df91
--- /dev/null
+++ b/src/leap/bitmask/backend/signaler_qt.py
@@ -0,0 +1,105 @@
+#!/usr/bin/env python
+# encoding: utf-8
+import threading
+import time
+
+from PySide import QtCore
+
+import zmq
+from zmq.auth.thread import ThreadAuthenticator
+
+from leap.bitmask.backend.api import SIGNALS
+from leap.bitmask.backend.utils import get_frontend_certificates
+
+import logging
+logger = logging.getLogger(__name__)
+
+
+class SignalerQt(QtCore.QThread):
+ """
+ Signaling server.
+ Receives signals from the signaling client and emit Qt signals for the GUI.
+ """
+ PORT = "5667"
+ BIND_ADDR = "tcp://127.0.0.1:%s" % PORT
+
+ def __init__(self):
+ QtCore.QThread.__init__(self)
+ self._do_work = threading.Event()
+ self._do_work.set()
+
+ def run(self):
+ """
+ Start a loop to process the ZMQ requests from the signaler client.
+ """
+ logger.debug("Running SignalerQt loop")
+ context = zmq.Context()
+ socket = context.socket(zmq.REP)
+
+ # Start an authenticator for this context.
+ auth = ThreadAuthenticator(context)
+ auth.start()
+ auth.allow('127.0.0.1')
+
+ # Tell authenticator to use the certificate in a directory
+ auth.configure_curve(domain='*', location=zmq.auth.CURVE_ALLOW_ANY)
+ public, secret = get_frontend_certificates()
+ socket.curve_publickey = public
+ socket.curve_secretkey = secret
+ socket.curve_server = True # must come before bind
+
+ socket.bind(self.BIND_ADDR)
+
+ while self._do_work.is_set():
+ # Wait for next request from client
+ try:
+ request = socket.recv(zmq.NOBLOCK)
+ logger.debug("Received request: '{0}'".format(request))
+ socket.send("OK")
+ self._process_request(request)
+ except zmq.ZMQError as e:
+ if e.errno != zmq.EAGAIN:
+ raise
+ time.sleep(0.01)
+
+ logger.debug("SignalerQt thread stopped.")
+
+ def stop(self):
+ """
+ Stop the SignalerQt blocking loop.
+ """
+ self._do_work.clear()
+
+ def _process_request(self, request_json):
+ """
+ Process a request and call the according method with the given
+ parameters.
+
+ :param request_json: a json specification of a request.
+ :type request_json: str
+ """
+ try:
+ request = zmq.utils.jsonapi.loads(request_json)
+ signal = request['signal']
+ data = request['data']
+ except Exception as e:
+ msg = "Malformed JSON data in Signaler request '{0}'. Exc: {1!r}"
+ msg = msg.format(request_json, e)
+ logger.critical(msg)
+ raise
+
+ if signal not in SIGNALS:
+ logger.error("Unknown signal received, '{0}'".format(signal))
+ return
+
+ try:
+ qt_signal = getattr(self, signal)
+ except Exception:
+ logger.warning("Signal not implemented, '{0}'".format(signal))
+ return
+
+ logger.debug("Emitting '{0}'".format(signal))
+ if data is None:
+ qt_signal.emit()
+ else:
+ qt_signal.emit(data)
diff --git a/src/leap/bitmask/backend/utils.py b/src/leap/bitmask/backend/utils.py
new file mode 100644
index 00000000..5fe59a62
--- /dev/null
+++ b/src/leap/bitmask/backend/utils.py
@@ -0,0 +1,43 @@
+#!/usr/bin/env python
+# encoding: utf-8
+import os
+import shutil
+
+import zmq.auth
+
+from leap.bitmask.util import get_path_prefix
+
+KEYS_DIR = os.path.join(get_path_prefix(), 'leap', 'zmq_certificates')
+
+
+def generate_certificates():
+ """
+ Generate client and server CURVE certificate files.
+ """
+ # Create directory for certificates, remove old content if necessary
+ if os.path.exists(KEYS_DIR):
+ shutil.rmtree(KEYS_DIR)
+ os.mkdir(KEYS_DIR)
+
+ # create new keys in certificates dir
+ # public_file, secret_file = create_certificates(...)
+ zmq.auth.create_certificates(KEYS_DIR, "frontend")
+ zmq.auth.create_certificates(KEYS_DIR, "backend")
+
+
+def get_frontend_certificates():
+ """
+ Return the frontend's public and secret certificates.
+ """
+ frontend_secret_file = os.path.join(KEYS_DIR, "frontend.key_secret")
+ public, secret = zmq.auth.load_certificate(frontend_secret_file)
+ return public, secret
+
+
+def get_backend_certificates(base_dir='.'):
+ """
+ Return the backend's public and secret certificates.
+ """
+ backend_secret_file = os.path.join(KEYS_DIR, "backend.key_secret")
+ public, secret = zmq.auth.load_certificate(backend_secret_file)
+ return public, secret