diff options
42 files changed, 2615 insertions, 2097 deletions
@@ -6,3 +6,4 @@ dist/ build/ MANIFEST +_trial_temp @@ -0,0 +1,7 @@ +Tomás Touceda <chiiph@leap.se> +Kali Kaneko <kali@leap.se> +drebs <drebs@leap.se> +Ivan Alejandro <ivanalejandro0@gmail.com> +Ruben Pollan <meskio@sindominio.net> +Bruno Wagner <bwgpro@gmail.com> +Victor Shyba <victor.shyba@gmail.com> @@ -1,3 +1,29 @@ +0.4.2 Aug 26, 2015: + o Add http request timeout. Related to #7234. + o Add a flag to disable events framework. Closes: #7259 + o Allow passing callback to HTTP client. + o Bugfix: do not add a port string to non-tcp addresses. + o Add close method for http agent. + o Fix code style and tests. + o Bugfix: HTTP timeout was not being cleared on abort. + +0.4.1 Jul 10, 2015: + o Fix regexp to allow ipc protocol in zmq sockets. Closes: #7089. + o Remove extraneous data from events logs. Closes #7130. + o Make https client use Twisted SSL validation and adds a reuse by default + behavior on connection pool + +0.4.0 Jun 1, 2015: + o Modify leap.common.events to use ZMQ. Closes #6359. + o Fix time comparison between local and UTC times that caused the VPN + certificates not being correctly downloaded on time. Closes #6994. + o Add a HTTPClient the twisted way. + +0.3.10 Jan 26, 2015: + o Consider different possibilities for tmpdir. Related to #6631. + o Add support for deferreds to memoize_method decorator + o Extract the environment set up and tear down for tests + 0.3.9 Jul 18, 2014: o Include pemfile in the package data. Closes #5897. o Look for bundled cacert.pem in the Resources dir for OSX. @@ -15,7 +15,6 @@ A collection of shared utils used by the several python LEAP subprojects. Library dependencies -------------------- -* ``protobuf-compiler`` * ``libssl-dev`` Python dependencies diff --git a/pkg/generate_wheels.sh b/pkg/generate_wheels.sh new file mode 100755 index 0000000..4cdc34e --- /dev/null +++ b/pkg/generate_wheels.sh @@ -0,0 +1,14 @@ +#!/bin/sh +# Generate wheels for dependencies +# For convenience, dirspec is allowed with insecure flags enabled. +# Use at your own risk. + +if [ "$WHEELHOUSE" = "" ]; then + WHEELHOUSE=$HOME/wheelhouse +fi + +pip wheel --wheel-dir $WHEELHOUSE pip +pip wheel --wheel-dir $WHEELHOUSE --allow-external dirspec --allow-unverified dirspec -r pkg/requirements.pip +if [ -f pkg/requirements-testing.pip ]; then + pip wheel --wheel-dir $WHEELHOUSE -r pkg/requirements-testing.pip +fi diff --git a/pkg/pip_install_requirements.sh b/pkg/pip_install_requirements.sh new file mode 100755 index 0000000..6d8ed28 --- /dev/null +++ b/pkg/pip_install_requirements.sh @@ -0,0 +1,86 @@ +#!/bin/bash +# Update pip and install LEAP base/testing requirements. +# For convenience, $insecure_packages are allowed with insecure flags enabled. +# Use at your own risk. +# See $usage for help + +insecure_packages="dirspec" +leap_wheelhouse=https://lizard.leap.se/wheels + +show_help() { + usage="Usage: $0 [--testing] [--use-leap-wheels]\n --testing\t\tInstall dependencies from requirements-testing.pip\n +\t\t\tOtherwise, it will install requirements.pip\n +--use-leap-wheels\tUse wheels from leap.se" + echo -e $usage + + exit 1 +} + +process_arguments() { + testing=false + use_leap_wheels=false + + while [ "$#" -gt 0 ]; do + # From http://stackoverflow.com/a/31443098 + case "$1" in + --help) show_help;; + --testing) testing=true; shift 1;; + --use-leap-wheels) use_leap_wheels=true; shift 1;; + + -h) show_help;; + -*) echo "unknown option: $1" >&2; exit 1;; + esac + done +} + +return_wheelhouse() { + if $use_leap_wheels ; then + WHEELHOUSE=$leap_wheelhouse + elif [ "$WHEELHOUSE" = "" ]; then + WHEELHOUSE=$HOME/wheelhouse + fi + + # Tested with bash and zsh + if [[ $WHEELHOUSE != http* && ! -d "$WHEELHOUSE" ]]; then + mkdir $WHEELHOUSE + fi + + echo "$WHEELHOUSE" +} + +return_install_options() { + wheelhouse=`return_wheelhouse` + install_options="-U --find-links=$wheelhouse" + if $use_leap_wheels ; then + install_options="$install_options --trusted-host lizard.leap.se" + fi + + echo $install_options +} + +return_insecure_flags() { + for insecure_package in $insecure_packages; do + flags="$flags --allow-external $insecure_package --allow-unverified $insecure_package" + done + + echo $flags +} + +return_packages() { + if $testing ; then + packages="-r pkg/requirements-testing.pip" + else + packages="-r pkg/requirements.pip" + fi + + echo $packages +} + +process_arguments $@ +install_options=`return_install_options` +insecure_flags=`return_insecure_flags` +packages=`return_packages` + +pip install -U wheel +pip install $install_options pip +pip install $install_options $insecure_flags $packages diff --git a/pkg/requirements-testing.pip b/pkg/requirements-testing.pip new file mode 100644 index 0000000..c5a3ad0 --- /dev/null +++ b/pkg/requirements-testing.pip @@ -0,0 +1,3 @@ +mock +setuptools-trial +pep8 diff --git a/pkg/requirements.pip b/pkg/requirements.pip index c89fd19..02fb189 100644 --- a/pkg/requirements.pip +++ b/pkg/requirements.pip @@ -1,8 +1,8 @@ jsonschema #<=0.8 -- are we done with this conflict? dirspec -protobuf>=2.4.1 -protobuf.socketrpc pyopenssl python-dateutil +pyzmq>=14.4.1 +txzmq>=0.7.3 #autopep8 -- ??? diff --git a/pkg/tools/get_authors.sh b/pkg/tools/get_authors.sh new file mode 100755 index 0000000..0169bb1 --- /dev/null +++ b/pkg/tools/get_authors.sh @@ -0,0 +1,2 @@ +#!/bin/sh +git log --format='%aN <%aE>' | awk '{arr[$0]++} END{for (i in arr){print arr[i], i;}}' | sort -rn | cut -d' ' -f2- diff --git a/pkg/utils.py b/pkg/utils.py index deace14..521cd4e 100644 --- a/pkg/utils.py +++ b/pkg/utils.py @@ -58,9 +58,9 @@ def parse_requirements(reqfiles=['requirements.txt', if re.match(r'\s*-e\s+', line): pass # do not try to do anything with externals on vcs - #requirements.append(re.sub(r'\s*-e\s+.*#egg=(.*)$', r'\1', - #line)) - # http://foo.bar/baz/foobar/zipball/master#egg=foobar + # requirements.append(re.sub(r'\s*-e\s+.*#egg=(.*)$', r'\1', + # line)) + # http://foo.bar/baz/foobar/zipball/master#egg=foobar elif re.match(r'\s*https?:', line): requirements.append(re.sub(r'\s*https?:.*#egg=(.*)$', r'\1', line)) @@ -3,3 +3,13 @@ tag_build = tag_date = 0 tag_svn_revision = 0 +[aliases] +test = trial + +[pep8] +exclude = versioneer.py,_version.py,*.egg,build,docs +ignore = E731 + +[flake8] +exclude = versioneer.py,_version.py,*.egg,build,docs +ignore = E731 @@ -138,4 +138,11 @@ setup( tests_require=tests_requirements, include_package_data=True, zip_safe=False, + + extras_require={ + # needed for leap.common.http + # service_identity needed for propper hostname identification, + # see http://twistedmatrix.com/documents/current/core/howto/ssl.html + 'Twisted': ["Twisted>=14.0.2", "service_identity", "zope.interface"] + }, ) diff --git a/src/leap/common/__init__.py b/src/leap/common/__init__.py index 5619900..383e198 100644 --- a/src/leap/common/__init__.py +++ b/src/leap/common/__init__.py @@ -4,6 +4,7 @@ from leap.common import certs from leap.common import check from leap.common import files from leap.common import events +from ._version import get_versions logger = logging.getLogger(__name__) @@ -11,11 +12,10 @@ try: import pygeoip HAS_GEOIP = True except ImportError: - #logger.debug('PyGeoIP not found. Disabled Geo support.') + # logger.debug('PyGeoIP not found. Disabled Geo support.') HAS_GEOIP = False __all__ = ["certs", "check", "files", "events"] -from ._version import get_versions __version__ = get_versions()['version'] del get_versions diff --git a/src/leap/common/_version.py b/src/leap/common/_version.py index e926143..3500492 100644 --- a/src/leap/common/_version.py +++ b/src/leap/common/_version.py @@ -1,13 +1,3 @@ -# This file was generated by the `freeze_debianver` command in setup.py -# Using 'versioneer.py' (0.7+) from -# revision-control system data, or from the parent directory name of an -# unpacked source archive. Distribution tarballs contain a pre-generated copy -# of this file. - version_version = '0.3.9' version_full = '3de1eeba83d50793283d65ba4566dd1611ee9d4b' - - -def get_versions(default={}, verbose=False): - return {'version': version_version, 'full': version_full} diff --git a/src/leap/common/ca_bundle.py b/src/leap/common/ca_bundle.py index d8c72a6..2c41d18 100644 --- a/src/leap/common/ca_bundle.py +++ b/src/leap/common/ca_bundle.py @@ -28,6 +28,7 @@ _system = platform.system() IS_MAC = _system == "Darwin" + def where(): """ Return the preferred certificate bundle. diff --git a/src/leap/common/certs.py b/src/leap/common/certs.py index 4fe563b..37ede8e 100644 --- a/src/leap/common/certs.py +++ b/src/leap/common/certs.py @@ -128,22 +128,23 @@ def is_valid_pemfile(cert): return can_load_cert_and_pkey(cert) -def get_cert_time_boundaries(certfile): +def get_cert_time_boundaries(certdata): """ - Returns the time boundaries for the certificate saved in certfile + Return the time boundaries for the given certificate. + The returned values are UTC/GMT time.struct_time objects - :param certfile: path to certificate - :type certfile: str + :param certdata: the certificate contents + :type certdata: str :rtype: tuple (from, to) """ - cert = get_cert_from_string(certfile) + cert = get_cert_from_string(certdata) leap_assert(cert, 'There was a problem loading the certificate') fromts, tots = (cert.get_notBefore(), cert.get_notAfter()) - from_, to_ = map( - lambda ts: time.gmtime(time.mktime(dateparse(ts).timetuple())), - (fromts, tots)) + from_ = dateparse(fromts).timetuple() + to_ = dateparse(tots).timetuple() + return from_, to_ @@ -177,3 +178,21 @@ def should_redownload(certfile, now=time.gmtime): return True return False + + +def get_compatible_ssl_context_factory(cert_path=None): + import twisted + cert = None + if twisted.version.base() > '14.0.1': + from twisted.web.client import BrowserLikePolicyForHTTPS + from twisted.internet import ssl + if cert_path: + cert = ssl.Certificate.loadPEM(open(cert_path).read()) + policy = BrowserLikePolicyForHTTPS(cert) + return policy + else: + raise Exception((""" + Twisted 14.0.2 is needed in order to have secure + Client Web SSL Contexts, not %s + See: http://twistedmatrix.com/trac/ticket/7647 + """) % (twisted.version.base())) diff --git a/src/leap/common/config/pluggableconfig.py b/src/leap/common/config/pluggableconfig.py index 8535fa6..1a98427 100644 --- a/src/leap/common/config/pluggableconfig.py +++ b/src/leap/common/config/pluggableconfig.py @@ -27,7 +27,7 @@ import urlparse import jsonschema -#from leap.base.util.translations import LEAPTranslatable +# from leap.base.util.translations import LEAPTranslatable from leap.common.check import leap_assert @@ -163,8 +163,8 @@ class TranslatableType(object): return data # LEAPTranslatable(data) # needed? we already have an extended dict... - #def get_prep_value(self, data): - #return dict(data) + # def get_prep_value(self, data): + # return dict(data) class URIType(object): @@ -283,9 +283,13 @@ class PluggableConfig(object): except BaseException, e: raise TypeCastException( "Could not coerce %s, %s, " - "to format %s: %s" % (key, value, - _ftype.__class__.__name__, - e)) + "to format %s: %s" % ( + key, + value, + _ftype.__class__.__name__, + e + ) + ) return config @@ -303,9 +307,12 @@ class PluggableConfig(object): except BaseException, e: raise TypeCastException( "Could not serialize %s, %s, " - "by format %s: %s" % (key, value, - _ftype.__class__.__name__, - e)) + "by format %s: %s" % ( + key, + value, + _ftype.__class__.__name__, + e) + ) else: config[key] = value return config @@ -435,7 +442,7 @@ class PluggableConfig(object): content = self.deserialize(string) if not string and fromfile is not None: - #import ipdb;ipdb.set_trace() + # import ipdb;ipdb.set_trace() content = self.deserialize(fromfile=fromfile) if not content: diff --git a/src/leap/common/config/tests/test_baseconfig.py b/src/leap/common/config/tests/test_baseconfig.py index 8bdf4d0..e17e82d 100644 --- a/src/leap/common/config/tests/test_baseconfig.py +++ b/src/leap/common/config/tests/test_baseconfig.py @@ -29,21 +29,21 @@ from mock import Mock # reduced eipconfig sample config sample_config = { "gateways": [ - { - "capabilities": { - "adblock": False, - "transport": ["openvpn"], - "user_ips": False - }, - "host": "host.dev.example.org", - }, { - "capabilities": { - "adblock": False, - "transport": ["openvpn"], - "user_ips": False - }, - "host": "host2.dev.example.org", - } + { + "capabilities": { + "adblock": False, + "transport": ["openvpn"], + "user_ips": False + }, + "host": "host.dev.example.org", + }, { + "capabilities": { + "adblock": False, + "transport": ["openvpn"], + "user_ips": False + }, + "host": "host2.dev.example.org", + } ], "default_language": "en", "languages": [ diff --git a/src/leap/common/decorators.py b/src/leap/common/decorators.py index 2ef6711..4b07ea9 100644 --- a/src/leap/common/decorators.py +++ b/src/leap/common/decorators.py @@ -22,6 +22,13 @@ import datetime import functools import logging +try: + from twisted.internet import defer +except ImportError: + class defer: + class Deferred: + pass + logger = logging.getLogger(__name__) @@ -59,6 +66,7 @@ class _memoized(object): # consume a huge amount of memory. self.cache = {} self.cache_ts = {} + self.is_deferred = {} def __call__(self, *args, **kwargs): """ @@ -67,28 +75,7 @@ class _memoized(object): :tyoe args: tuple :type kwargs: dict """ - def ret_or_raise(value): - """ - Returns the value except if it is an exception, - in which case it's raised. - """ - if isinstance(value, Exception): - raise value - return value - - if self.is_method: - # forget about `self` as key - key_args = args[1:] - else: - key_args = args - - if self.ignore_kwargs is True: - key = key_args - else: - key = (key_args, frozenset( - [(k, v) for k, v in kwargs.items() - if k not in self.ignore_kwargs])) - + key = self._build_key(*args, **kwargs) if not isinstance(key, collections.Hashable): # uncacheable. a list, for instance. # better to not cache than blow up. @@ -97,9 +84,9 @@ class _memoized(object): if key in self.cache: if self._is_cache_still_valid(key): - value = self.cache[key] logger.debug("Got value from cache...") - return ret_or_raise(value) + value = self._get_value(key) + return self._ret_or_raise(value) else: logger.debug("Cache is invalid, evaluating again...") @@ -109,9 +96,52 @@ class _memoized(object): except Exception as exc: logger.error("Exception while calling function: %r" % (exc,)) value = exc - self.cache[key] = value + + if isinstance(value, defer.Deferred): + value.addBoth(self._store_deferred_value(key)) + else: + self.cache[key] = value + self.is_deferred[key] = False self.cache_ts[key] = datetime.datetime.now() - return ret_or_raise(value) + return self._ret_or_raise(value) + + def _build_key(self, *args, **kwargs): + """ + Build key from the function arguments + """ + if self.is_method: + # forget about `self` as key + key_args = args[1:] + else: + key_args = args + + if self.ignore_kwargs is True: + key = key_args + else: + key = (key_args, frozenset( + [(k, v) for k, v in kwargs.items() + if k not in self.ignore_kwargs])) + return key + + def _get_value(self, key): + """ + Get a value form cache for a key + """ + if self.is_deferred[key]: + value = self.cache[key] + # it produces an errback if value is Failure + return defer.succeed(value) + else: + return self.cache[key] + + def _ret_or_raise(self, value): + """ + Returns the value except if it is an exception, + in which case it's raised. + """ + if isinstance(value, Exception): + raise value + return value def _is_cache_still_valid(self, key, now=datetime.datetime.now): """ @@ -131,6 +161,16 @@ class _memoized(object): delta = datetime.timedelta(seconds=self.CACHE_INVALIDATION_DELTA) return (now() - cached_ts) < delta + def _store_deferred_value(self, key): + """ + Returns a callback to store values from deferreds + """ + def callback(value): + self.cache[key] = value + self.is_deferred[key] = True + return value + return callback + def __repr__(self): """ Return the function's docstring. @@ -144,17 +184,20 @@ class _memoized(object): return functools.partial(self.__call__, obj) -def memoized_method(function=None, ignore_kwargs=None): +def memoized_method(function=None, ignore_kwargs=None, + invalidation=_memoized.CACHE_INVALIDATION_DELTA): """ Wrap _memoized to allow for deferred calling :type function: callable, or None. :type ignore_kwargs: None, True or tuple. + :type invalidation: int seconds. """ if function: - return _memoized(function, is_method=True) + return _memoized(function, is_method=True, invalidation=invalidation) else: def wrapper(function): return _memoized( - function, ignore_kwargs=ignore_kwargs, is_method=True) + function, ignore_kwargs=ignore_kwargs, is_method=True, + invalidation=invalidation) return wrapper diff --git a/src/leap/common/events/README.rst b/src/leap/common/events/README.rst index 2e7f254..f455cc8 100644 --- a/src/leap/common/events/README.rst +++ b/src/leap/common/events/README.rst @@ -1,19 +1,83 @@ Events mechanism ================ -The events mechanism allows for clients to send signal events to each -other by means of a centralized server. Clients can register with the -server to receive signals of certain types, and they can also send signals to -the server that will then redistribute these signals to registered clients. +The events mechanism allows for clients to send events to each other by means +of a centralized server. Clients can register with the server to receive +events of certain types, and they can also send events to the server that will +then redistribute these events to registered clients. -Listening daemons ------------------ +ZMQ connections and events redistribution +----------------------------------------- -Both clients and the server listen for incoming messages by using a -listening daemon that runs in its own thread. The server daemon has to be -started explicitly, while clients daemon will be started whenever a -client registers with the server to receive messages. +Clients and server use ZMQ connection patterns to communicate. Clients can +push events to the server, and may subscribe to events published by the +server. The server in turn pulls events from clients and publishes them to +subscribed clients. + +Clients connect to the server's zmq pub socket, and register to specific +events indicating which callbacks should be executed when that event is +received: + + + EventsServer + .------------. + |PULL PUB| + '------------' + ^^ + || + reg(1, cbk1) |'--------------. reg(2, cbk2) + | | + | | + .------------. .------------. .------------. + |PUSH SUB| |PUSH SUB| |PUSH SUB| + '------------' '------------' '------------' + EventsClient EventsClient EventsClient + + +A client that wants to send an event connects to the server's zmq pull socket +and pushes the event to the server. The server then redistributes it to all +clients subscribed to that event. + + + EventsServer + .------------. + |PULL---->PUB| + '------------' + ^ |. + | |. +sig(1, 'foo') .----------------' |'............... + | | . + | v . + .------------. .------------. .------------. + |PUSH SUB| |PUSH SUB| |PUSH SUB| + '------------' '------------' '------------' + EventsClient EventsClient EventsClient + | + v + cbk1(1, 'foo') + + +Any client may emit or subscribe to an event. ZMQ will manage sockets and +reuse the connection whenever it can. + + + EventsServer + .------------. + |PULL---->PUB| + '------------' + ^ .| + | .| +sig(2, 'bar') .-----------------' .'--------------. + | . | + | . v + .------------. .------------. .------------. + |PUSH SUB| |PUSH SUB| |PUSH SUB| + '------------' '------------' '------------' + EventsClient EventsClient EventsClient + | + v + cbk2(2, 'bar') How to use it @@ -22,32 +86,27 @@ How to use it To start the events server: >>> from leap.common.events import server ->>> server.ensure_server(port=8090) +>>> server.ensure_server( + emit_addr="tcp://127.0.0.1:9000", + reg_addr="tcp://127.0.0.1:9001") -To register a callback to be called when a given signal is raised: +To register a callback to be called when a given event is raised: ->>> from leap.common.events import ( ->>> register, ->>> events_pb2 as proto, ->>> ) +>>> from leap.common.events import register +>>> from leap.common.events import catalog >>> ->>> def mycallback(sigreq): ->>> print str(sigreq) +>>> def mycbk(event, *content): +>>> print "%s, %s" (str(event), str(content)) >>> ->>> events.register(signal=proto.CLIENT_UID, callback=mycallback) +>>> register(catalog.CLIENT_UID, callback=mycbk) -To signal an event: +To emit an event: ->>> from leap.common.events import ( ->>> signal, ->>> events_pb2 as proto, ->>> ) ->>> signal(proto.CLIENT_UID) +>>> from leap.common.events import emit +>>> from leap.common.events import catalog +>>> emit(catalog.CLIENT_UID) Adding events ------------- -* Add the new event under enum ``Event`` in ``events.proto`` -* Compile the new protocolbuffers file:: - - make +To add a new event, just add it to ``catalog.py``. diff --git a/src/leap/common/events/__init__.py b/src/leap/common/events/__init__.py index 0cc6573..87ed8ae 100644 --- a/src/leap/common/events/__init__.py +++ b/src/leap/common/events/__init__.py @@ -14,189 +14,199 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. - """ -This is an events mechanism that uses a server to allow for signaling of -events between clients. +This is an events mechanism that uses a server to allow for emitting events +between clients. Application components should use the interface available in this file to -register callbacks to be executed upon receiving specific signals, and to send -signals to other components. +register callbacks to be executed upon receiving specific events, and to send +events to other components. -To register a callback to be executed when a specific event is signaled, use +To register a callback to be executed when a specific event is emitted, use leap.common.events.register(): >>> from leap.common.events import register ->>> from leap.common.events import events_pb2 as proto ->>> register(proto.CLIENT_UID, lambda req: do_something(req)) - -To signal an event, use leap.common.events.signal(): - ->>> from leap.common.events import signal ->>> from leap.common.events import events_pb2 as proto ->>> signal(proto.CLIENT_UID) - - -NOTE ABOUT SYNC/ASYNC REQUESTS: - -Clients always communicate with the server, and never between themselves. -When a client registers a callback for an event, the callback is saved locally -in the client and the server stores the client socket port in a list -associated with that event. When a client signals an event, the server -forwards the signal to all registered client ports, and then each client -executes its callbacks associated with that event locally. - -Each RPC call from a client to the server is followed by a response from the -server to the client. Calls to register() and signal() (and all other RPC -calls) can be synchronous or asynchronous meaning if they will or not wait for -the server's response before returning. - -This mechanism was built on top of protobuf.socketrpc, and because of this RPC -calls are made synchronous or asynchronous in the following way: - - * If RPC calls receive a parameter called `reqcbk`, then the call is made - asynchronous. That means that: - - - an eventual `timeout` parameter is not used, - - the call returns immediatelly with value None, and - - the `reqcbk` callback is executed asynchronously upon the arrival of - a response from the server. +>>> from leap.common.events import catalog +>>> register(catalog.CLIENT_UID, lambda sig, content: do_something(content)) - * Otherwise, if the `reqcbk` parameter is None, then the call is made in a - synchronous manner: +To emit an event, use leap.common.events.emit(): - - if a response from server arrives within `timeout` milliseconds, the - RPC call returns it; - - otherwise, the call returns None. +>>> from leap.common.events import emit +>>> from leap.common.events import catalog +>>> emit(catalog.CLIENT_UID) """ - import logging -import socket +import argparse + +from leap.common.events import client +from leap.common.events import server +from leap.common.events.flags import set_events_enabled + +from leap.common.events import catalog -from leap.common.events import ( - events_pb2 as proto, - server, - client, - daemon, -) +__all__ = [ + "register", + "unregister", + "emit", + "catalog", + "set_events_enabled" +] logger = logging.getLogger(__name__) -def register(signal, callback, uid=None, replace=False, reqcbk=None, - timeout=1000): +def register(event, callback, uid=None, replace=False): """ - Register a callback to be called when the given signal is received. - - Will timeout after timeout ms if response has not been received. The - timeout arg is only used for asynch requests. If a reqcbk callback has - been supplied the timeout arg is not used. The response value will be - returned for a synch request but nothing will be returned for an asynch - request. - - :param signal: the signal that causes the callback to be launched - :type signal: int (see the `events.proto` file) - :param callback: the callback to be called when the signal is received - :type callback: function - :param uid: a unique id for the callback - :type uid: int - :param replace: should an existent callback with same uid be replaced? + Register a callback to be executed when an event is received. + + :param event: The event that triggers the callback. + :type event: str + :param callback: The callback to be executed. + :type callback: callable(event, content) + :param uid: The callback uid. + :type uid: str + :param replace: Wether an eventual callback with same ID should be + replaced. :type replace: bool - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(leap.common.events.events_pb2.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - :return: the response from server for synch calls or nothing for asynch - calls. - :rtype: leap.common.events.events_pb2.EventsResponse or None - """ - return client.register(signal, callback, uid, replace, reqcbk, timeout) + :return: The callback uid. + :rtype: str -def unregister(signal, uid=None, reqcbk=None, timeout=1000): - """ - Unregister a callback. - - If C{uid} is specified, unregisters only the callback identified by that - unique id. Otherwise, unregisters all callbacks registered for C{signal}. - - :param signal: the signal that causes the callback to be launched - :type signal: int (see the `events.proto` file) - :param uid: a unique id for the callback - :type uid: int - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(proto.UnregisterRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - :return: the response from server for synch calls or nothing for asynch - calls. - :rtype: leap.common.events.events_pb2.EventsResponse or None + :raises CallbackAlreadyRegistered: when there's already a callback + identified by the given uid and replace is False. """ - return client.unregister(signal, uid, reqcbk, timeout) + return client.register(event, callback, uid, replace) -def signal(signal, content="", mac_method="", mac="", reqcbk=None, - timeout=1000): +def unregister(event, uid=None): """ - Send `signal` event to events server. - - Will timeout after timeout ms if response has not been received. The - timeout arg is only used for asynch requests. If a reqcbk callback has - been supplied the timeout arg is not used. The response value will be - returned for a synch request but nothing will be returned for an asynch - request. - - :param signal: the signal that causes the callback to be launched - :type signal: int (see the `events.proto` file) - :param content: the contents of the event signal - :type content: str - :param mac_method: the method used to auth mac - :type mac_method: str - :param mac: the content of the auth mac - :type mac: str - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(proto.SignalRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - :return: the response from server for synch calls or nothing for asynch - calls. - :rtype: leap.common.events.events_pb2.EventsResponse or None - """ - return client.signal(signal, content, mac_method, mac, reqcbk, timeout) + Unregister callbacks for an event. -def ping_client(port, reqcbk=None, timeout=1000): - """ - Ping a client running in C{port}. - - :param port: the port in which the client should be listening - :type port: int - :param reqcbk: a callback to be called when a response from client is - received - :type reqcbk: function(proto.PingRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int + If uid is not None, then only the callback identified by the given uid is + removed. Otherwise, all callbacks for the event are removed. + + :param event: The event that triggers the callback. + :type event: Event + :param uid: The callback uid. + :type uid: str """ - return client.ping(port, reqcbk=reqcbk, timeout=timeout) + return client.unregister(event, uid) -def ping_server(port=server.SERVER_PORT, reqcbk=None, timeout=1000): +def emit(event, *content): """ - Ping the server. - - :param port: the port in which server should be listening - :type port: int - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(proto.PingRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int + Send an event. + + :param event: The event to be sent. + :type event: Event + :param content: The content of the event. + :type content: list """ - return server.ping(port, reqcbk=reqcbk, timeout=timeout) + return client.emit(event, *content) + + +if __name__ == "__main__": + + def _echo(event, *content): + print "Received event: (%s, %s)" % (event, content) + + def _parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument( + "--debug", "-d", action="store_true", + help="print debug information") + + subparsers = parser.add_subparsers(dest="command") + + # server options + server_parser = subparsers.add_parser( + "server", help="Run an events server.") + server_parser.add_argument( + "--emit-addr", + help="The address in which to listen for events", + default=server.EMIT_ADDR) + server_parser.add_argument( + "--reg-addr", + help="The address in which to listen for registration for events.", + default=server.REG_ADDR) + + # client options + client_parser = subparsers.add_parser( + "client", help="Run an events client.") + client_parser.add_argument( + "--emit-addr", + help="The address in which to emit events.", + default=server.EMIT_ADDR) + client_parser.add_argument( + "--reg-addr", + help="The address in which to register for events.", + default=server.REG_ADDR) + group = client_parser.add_mutually_exclusive_group(required=True) + group.add_argument('--reg', help="register an event") + group.add_argument('--emit', help="send an event") + client_parser.add_argument( + '--content', help="the content of the event", default=None) + + # txclient options + txclient_parser = subparsers.add_parser( + "txclient", help="Run an events twisted client.") + txclient_parser.add_argument( + "--emit-addr", + help="The address in which to emit events.", + default=server.EMIT_ADDR) + txclient_parser.add_argument( + "--reg-addr", + help="The address in which to register for events.", + default=server.REG_ADDR) + group = txclient_parser.add_mutually_exclusive_group(required=True) + group.add_argument('--reg', help="register an event") + group.add_argument('--emit', help="send an event") + txclient_parser.add_argument( + '--content', help="the content of the event", default=None) + + return parser.parse_args() + + args = _parse_args() + + if args.debug: + logging.basicConfig(level=logging.DEBUG) + + if args.command == "server": + # run server + server.ensure_server(emit_addr=args.emit_addr, reg_addr=args.reg_addr) + from twisted.internet import reactor + reactor.run() + elif args.command == "client": + if args.reg: + event = getattr(catalog, args.reg) + # run client and register to a signal + register(event, _echo) + # make sure we stop on CTRL+C + import signal + signal.signal( + signal.SIGINT, lambda sig, frame: client.shutdown()) + # wait until client thread dies + import time + while client.EventsClientThread.instance().is_alive(): + time.sleep(0.1) + if args.emit: + # run client and emit a signal + event = getattr(catalog, args.emit) + emit(event, args.content) + client.shutdown() + elif args.command == "txclient": + from leap.common.events import txclient + register = txclient.register + emit = txclient.emit + if args.reg: + event = getattr(catalog, args.reg) + # run client and register to a signal + register(event, _echo) + from twisted.internet import reactor + reactor.run() + if args.emit: + # run client and emit a signal + event = getattr(catalog, args.emit) + emit(event, args.content) diff --git a/src/leap/common/events/catalog.py b/src/leap/common/events/catalog.py new file mode 100644 index 0000000..8bddd2c --- /dev/null +++ b/src/leap/common/events/catalog.py @@ -0,0 +1,88 @@ +# -*- coding: utf-8 -*- +# catalog.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful", +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +Events catalog. +""" + + +EVENTS = [ + "CLIENT_SESSION_ID", + "CLIENT_UID", + "IMAP_CLIENT_LOGIN", + "IMAP_SERVICE_FAILED_TO_START", + "IMAP_SERVICE_STARTED", + "IMAP_UNHANDLED_ERROR", + "KEYMANAGER_DONE_UPLOADING_KEYS", + "KEYMANAGER_FINISHED_KEY_GENERATION", + "KEYMANAGER_KEY_FOUND", + "KEYMANAGER_KEY_NOT_FOUND", + "KEYMANAGER_LOOKING_FOR_KEY", + "KEYMANAGER_STARTED_KEY_GENERATION", + "MAIL_FETCHED_INCOMING", + "MAIL_MSG_DECRYPTED", + "MAIL_MSG_DELETED_INCOMING", + "MAIL_MSG_PROCESSING", + "MAIL_MSG_SAVED_LOCALLY", + "MAIL_UNREAD_MESSAGES", + "RAISE_WINDOW", + "SMTP_CONNECTION_LOST", + "SMTP_END_ENCRYPT_AND_SIGN", + "SMTP_END_SIGN", + "SMTP_RECIPIENT_ACCEPTED_ENCRYPTED", + "SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED", + "SMTP_RECIPIENT_REJECTED", + "SMTP_SEND_MESSAGE_ERROR", + "SMTP_SEND_MESSAGE_START", + "SMTP_SEND_MESSAGE_SUCCESS", + "SMTP_SERVICE_FAILED_TO_START", + "SMTP_SERVICE_STARTED", + "SMTP_START_ENCRYPT_AND_SIGN", + "SMTP_START_SIGN", + "SOLEDAD_CREATING_KEYS", + "SOLEDAD_DONE_CREATING_KEYS", + "SOLEDAD_DONE_DATA_SYNC", + "SOLEDAD_DONE_DOWNLOADING_KEYS", + "SOLEDAD_DONE_UPLOADING_KEYS", + "SOLEDAD_DOWNLOADING_KEYS", + "SOLEDAD_INVALID_AUTH_TOKEN", + "SOLEDAD_NEW_DATA_TO_SYNC", + "SOLEDAD_SYNC_RECEIVE_STATUS", + "SOLEDAD_SYNC_SEND_STATUS", + "SOLEDAD_UPLOADING_KEYS", + "UPDATER_DONE_UPDATING", + "UPDATER_NEW_UPDATES", +] + + +class Event(object): + + def __init__(self, label): + self.label = label + + def __repr__(self): + return '<Event: %s>' % self.label + + def __str__(self): + return self.label + + +# create local variables based on the event list above +lcl = locals() +for event in EVENTS: + lcl[event] = Event(event) diff --git a/src/leap/common/events/client.py b/src/leap/common/events/client.py index 83f18e0..e085f5b 100644 --- a/src/leap/common/events/client.py +++ b/src/leap/common/events/client.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # client.py -# Copyright (C) 2013 LEAP +# Copyright (C) 2013, 2014, 2015 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -21,309 +21,532 @@ Clients are the communicating parties of the events mechanism. They communicate by sending messages to a server, which in turn redistributes messages to other clients. -When a client registers a callback for a given signal, it also tells the -server that it wants to be notified whenever signals of that type are sent by +When a client registers a callback for a given event, it also tells the +server that it wants to be notified whenever events of that type are sent by some other client. """ - import logging +import collections +import uuid +import threading +import time +import pickle +import os + +from abc import ABCMeta +from abc import abstractmethod + +import zmq +from zmq.eventloop import zmqstream +from zmq.eventloop import ioloop + +# XXX some distros don't package libsodium, so we have to be prepared for +# absence of zmq.auth +try: + import zmq.auth +except ImportError: + pass +from leap.common.config import get_path_prefix +from leap.common.zmq_utils import zmq_has_curve +from leap.common.zmq_utils import maybe_create_and_get_certificates +from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX -from protobuf.socketrpc import RpcService - -from leap.common.events import events_pb2 as proto -from leap.common.events import server -from leap.common.events import daemon -from leap.common.events import mac_auth +from leap.common.events.errors import CallbackAlreadyRegisteredError +from leap.common.events.server import EMIT_ADDR +from leap.common.events.server import REG_ADDR +from leap.common.events import flags +from leap.common.events import catalog logger = logging.getLogger(__name__) -# the `registered_callbacks` dictionary below should have the following -# format: -# -# { event_signal: [ (uid, callback), ... ], ... } -# -registered_callbacks = {} +_emit_addr = EMIT_ADDR +_reg_addr = REG_ADDR + +def configure_client(emit_addr, reg_addr): + global _emit_addr, _reg_addr + logger.debug("Configuring client with addresses: (%s, %s)" % + (emit_addr, reg_addr)) + _emit_addr = emit_addr + _reg_addr = reg_addr -class CallbackAlreadyRegistered(Exception): + +class EventsClient(object): """ - Raised when trying to register an already registered callback. + A singleton client for the events mechanism. """ - pass + __metaclass__ = ABCMeta -def ensure_client_daemon(): - """ - Ensure the client daemon is running and listening for incoming - messages. + _instance = None + _instance_lock = threading.Lock() - :return: the daemon instance - :rtype: EventsClientDaemon - """ - import time - daemon = EventsClientDaemon.ensure(0) - logger.debug('ensure client daemon') + def __init__(self, emit_addr, reg_addr): + """ + Initialize the events client. + """ + logger.debug("Creating client instance.") + self._callbacks = collections.defaultdict(dict) + self._emit_addr = emit_addr + self._reg_addr = reg_addr - # Because we use a random port we want to wait until a port is assigned to - # local client daemon. + @property + def callbacks(self): + return self._callbacks - while not (EventsClientDaemon.get_instance() and - EventsClientDaemon.get_instance().get_port()): - time.sleep(0.1) - return daemon + @classmethod + def instance(cls): + """ + Return a singleton EventsClient instance. + """ + with cls._instance_lock: + if cls._instance is None: + cls._instance = cls(_emit_addr, _reg_addr) + return cls._instance + def register(self, event, callback, uid=None, replace=False): + """ + Register a callback to be executed when an event is received. + + :param event: The event that triggers the callback. + :type event: Event + :param callback: The callback to be executed. + :type callback: callable(event, *content) + :param uid: The callback uid. + :type uid: str + :param replace: Wether an eventual callback with same ID should be + replaced. + :type replace: bool + + :return: The callback uid. + :rtype: str + + :raises CallbackAlreadyRegisteredError: when there's already a callback + identified by the given uid and replace is False. + """ + logger.debug("Subscribing to event: %s" % event) + if not uid: + uid = uuid.uuid4() + elif uid in self._callbacks[event] and not replace: + raise CallbackAlreadyRegisteredError() + self._callbacks[event][uid] = callback + self._subscribe(str(event)) + return uid + + def unregister(self, event, uid=None): + """ + Unregister callbacks for an event. -def register(signal, callback, uid=None, replace=False, reqcbk=None, - timeout=1000): - """ - Registers a callback to be called when a specific signal event is - received. - - Will timeout after timeout ms if response has not been received. The - timeout arg is only used for asynch requests. If a reqcbk callback has - been supplied the timeout arg is not used. The response value will be - returned for a synch request but nothing will be returned for an asynch - request. - - :param signal: the signal that causes the callback to be launched - :type signal: int (see the `events.proto` file) - :param callback: the callback to be called when the signal is received - :type callback: function(leap.common.events.events_pb2.SignalRequest) - :param uid: a unique id for the callback - :type uid: int - :param replace: should an existent callback with same uid be replaced? - :type replace: bool - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(proto.RegisterRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - Might raise a CallbackAlreadyRegistered exception if there's already a - callback identified by the given uid and replace is False. - - :return: the response from server for synch calls or nothing for asynch - calls. - :rtype: leap.common.events.events_pb2.EventsResponse or None - """ - ensure_client_daemon() # so we can receive registered signals - # register callback locally - if signal not in registered_callbacks: - registered_callbacks[signal] = [] - cbklist = registered_callbacks[signal] - - # TODO should check that the callback has the right - # number of arguments. - - if uid and filter(lambda (x, y): x == uid, cbklist): - if not replace: - raise CallbackAlreadyRegistered() + If uid is not None, then only the callback identified by the given uid + is removed. Otherwise, all callbacks for the event are removed. + + :param event: The event that triggers the callback. + :type event: Event + :param uid: The callback uid. + :type uid: str + """ + if not uid: + logger.debug( + "Unregistering all callbacks from event %s." % event) + self._callbacks[event] = {} else: - registered_callbacks[signal] = filter(lambda(x, y): x != uid, - cbklist) - registered_callbacks[signal].append((uid, callback)) - # register callback on server - request = proto.RegisterRequest() - request.event = signal - request.port = EventsClientDaemon.get_instance().get_port() - request.mac_method = mac_auth.MacMethod.MAC_NONE - request.mac = "" - service = RpcService(proto.EventsServerService_Stub, - server.SERVER_PORT, 'localhost') - logger.debug( - "Sending registration request to server on port %s: %s", - server.SERVER_PORT, - str(request)[:40]) - return service.register(request, callback=reqcbk, timeout=timeout) - - -def unregister(signal, uid=None, reqcbk=None, timeout=1000): - """ - Unregister a callback. - - If C{uid} is specified, unregisters only the callback identified by that - unique id. Otherwise, unregisters all callbacks - - :param signal: the signal that causes the callback to be launched - :type signal: int (see the `events.proto` file) - :param uid: a unique id for the callback - :type uid: int - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(proto.UnregisterRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - :return: the response from server for synch calls or nothing for asynch - calls or None if no callback is registered for that signal or - uid. - :rtype: leap.common.events.events_pb2.EventsResponse or None - """ - if signal not in registered_callbacks or not registered_callbacks[signal]: - logger.warning("No callback registered for signal %d." % signal) - return None - # unregister callback locally - cbklist = registered_callbacks[signal] - if uid is not None: - if filter(lambda (cbkuid, _): cbkuid == uid, cbklist) == []: - logger.warning("No callback registered for uid %d." % st) - return None - registered_callbacks[signal] = filter(lambda(x, y): x != uid, cbklist) - else: - # exclude all callbacks for given signal - registered_callbacks[signal] = [] - # unregister port in server if there are no more callbacks for this signal - if not registered_callbacks[signal]: - request = proto.UnregisterRequest() - request.event = signal - request.port = EventsClientDaemon.get_instance().get_port() - request.mac_method = mac_auth.MacMethod.MAC_NONE - request.mac = "" - service = RpcService(proto.EventsServerService_Stub, - server.SERVER_PORT, 'localhost') - logger.info( - "Sending unregistration request to server on port %s: %s", - server.SERVER_PORT, - str(request)[:40]) - return service.unregister(request, callback=reqcbk, timeout=timeout) - - -def signal(signal, content="", mac_method="", mac="", reqcbk=None, - timeout=1000): - """ - Send `signal` event to events server. - - Will timeout after timeout ms if response has not been received. The - timeout arg is only used for asynch requests. If a reqcbk callback has - been supplied the timeout arg is not used. The response value will be - returned for a synch request but nothing will be returned for an asynch - request. - - :param signal: the signal that causes the callback to be launched - :type signal: int (see the `events.proto` file) - :param content: the contents of the event signal - :type content: str - :param mac_method: the method used for auth mac - :type mac_method: str - :param mac: the content of the auth mac - :type mac: str - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(proto.SignalRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - :return: the response from server for synch calls or nothing for asynch - calls. - :rtype: leap.common.events.events_pb2.EventsResponse or None - """ - request = proto.SignalRequest() - request.event = signal - request.content = content - request.mac_method = mac_method - request.mac = mac - service = RpcService(proto.EventsServerService_Stub, server.SERVER_PORT, - 'localhost') - logger.debug("Sending signal to server: %s", str(request)[:40]) - return service.signal(request, callback=reqcbk, timeout=timeout) - - -def ping(port, reqcbk=None, timeout=1000): + logger.debug( + "Unregistering callback %s from event %s." % (uid, event)) + if uid in self._callbacks[event]: + del self._callbacks[event][uid] + if not self._callbacks[event]: + del self._callbacks[event] + self._unsubscribe(str(event)) + + def emit(self, event, *content): + """ + Send an event. + + :param event: The event to be sent. + :type event: Event + :param content: The content of the event. + :type content: list + """ + logger.debug("Emitting event: (%s, %s)" % (event, content)) + payload = str(event) + b'\0' + pickle.dumps(content) + self._send(payload) + + def _handle_event(self, event, content): + """ + Handle an incoming event. + + :param event: The event to be sent. + :type event: Event + :param content: The content of the event. + :type content: list + """ + logger.debug("Handling event %s..." % event) + for uid in self._callbacks[event]: + callback = self._callbacks[event][uid] + logger.debug("Executing callback %s." % uid) + self._run_callback(callback, event, content) + + @abstractmethod + def _run_callback(self, callback, event, content): + """ + Run a callback. + + :param callback: The callback to be run. + :type callback: callable(event, *content) + :param event: The event to be sent. + :type event: Event + :param content: The content of the event. + :type content: list + """ + pass + + @abstractmethod + def _subscribe(self, tag): + """ + Subscribe to a tag on the zmq SUB socket. + + :param tag: The tag to be subscribed. + :type tag: str + """ + pass + + @abstractmethod + def _unsubscribe(self, tag): + """ + Unsubscribe from a tag on the zmq SUB socket. + + :param tag: The tag to be unsubscribed. + :type tag: str + """ + pass + + @abstractmethod + def _send(self, data): + """ + Send data through PUSH socket. + + :param data: The data to be sent. + :type event: str + """ + pass + + def shutdown(self): + self.__class__.reset() + + @classmethod + def reset(cls): + with cls._instance_lock: + cls._instance = None + + +class EventsIOLoop(ioloop.ZMQIOLoop): """ - Ping a client running in C{port}. - - :param port: the port in which the client should be listening - :type port: int - :param reqcbk: a callback to be called when a response from client is - received - :type reqcbk: function(proto.PingRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - :return: the response from client for synch calls or nothing for asynch - calls. - :rtype: leap.common.events.events_pb2.EventsResponse or None + An extension of zmq's ioloop that can wait until there are no callbacks + in the queue before stopping. """ - request = proto.PingRequest() - service = RpcService( - proto.EventsClientService_Stub, - port, - 'localhost') - logger.debug("Pinging a client in port %d..." % port) - return service.ping(request, callback=reqcbk, timeout=timeout) + def stop(self, wait=False): + """ + Stop the I/O loop. -class EventsClientService(proto.EventsClientService): + :param wait: Whether we should wait for callbacks in queue to finish + before stopping. + :type wait: bool + """ + if wait: + # prevent new callbacks from being added + with self._callback_lock: + self._closing = True + # wait until all callbacks have been executed + while self._callbacks: + time.sleep(0.1) + ioloop.ZMQIOLoop.stop(self) + + +class EventsClientThread(threading.Thread, EventsClient): """ - Service for receiving signal events in clients. + A threaded version of the events client. """ - def __init__(self): - proto.EventsClientService.__init__(self) + def __init__(self, emit_addr, reg_addr): + """ + Initialize the events client. + """ + threading.Thread.__init__(self) + EventsClient.__init__(self, emit_addr, reg_addr) + self._lock = threading.Lock() + self._initialized = threading.Event() + self._config_prefix = os.path.join( + get_path_prefix(), "leap", "events") + self._loop = None + self._context = None + self._push = None + self._sub = None + + def _init_zmq(self): + """ + Initialize ZMQ connections. + """ + self._loop = EventsIOLoop() + self._context = zmq.Context() + # connect SUB first, otherwise we might miss some event sent from this + # same client + self._sub = self._zmq_connect_sub() + self._push = self._zmq_connect_push() + + def _zmq_connect(self, socktype, address): + """ + Connect to an address using with a zmq socktype. - def signal(self, controller, request, done): + :param socktype: The ZMQ socket type. + :type socktype: int + :param address: The address to connect to. + :type address: str + + :return: A ZMQ connection stream. + :rtype: ZMQStream + """ + logger.debug("Connecting %s to %s." % (socktype, address)) + socket = self._context.socket(socktype) + # configure curve authentication + if zmq_has_curve(): + public, private = maybe_create_and_get_certificates( + self._config_prefix, "client") + server_public_file = os.path.join( + self._config_prefix, PUBLIC_KEYS_PREFIX, "server.key") + server_public, _ = zmq.auth.load_certificate(server_public_file) + socket.curve_publickey = public + socket.curve_secretkey = private + socket.curve_serverkey = server_public + stream = zmqstream.ZMQStream(socket, self._loop) + socket.connect(address) + return stream + + def _zmq_connect_push(self): """ - Receive a signal and run callbacks registered for that signal. + Initialize the client's PUSH connection. - This method is called whenever a signal request is received from - server. + :return: A ZMQ connection stream. + :rtype: ZMQStream + """ + return self._zmq_connect(zmq.PUSH, self._emit_addr) - :param controller: used to mediate a single method call - :type controller: protobuf.socketrpc.controller.SocketRpcController - :param request: the request received from the client - :type request: leap.common.events.events_pb2.SignalRequest - :param done: callback to be called when done - :type done: protobuf.socketrpc.server.Callback + def _zmq_connect_sub(self): """ - logger.debug('Received signal from server: %s...' % str(request)[:40]) + Initialize the client's SUB connection. - # run registered callbacks - # TODO: verify authentication using mac in incoming message - if request.event in registered_callbacks: - for (_, cbk) in registered_callbacks[request.event]: - # callbacks should be prepared to receive a - # events_pb2.SignalRequest. - cbk(request) + :return: A ZMQ connection stream. + :rtype: ZMQStream + """ + stream = self._zmq_connect(zmq.SUB, self._reg_addr) + stream.on_recv(self._on_recv) + return stream - # send response back to server - response = proto.EventResponse() - response.status = proto.EventResponse.OK - done.run(response) + def _on_recv(self, msg): + """ + Handle an incoming message in the SUB socket. - def ping(self, controller, request, done): + :param msg: The received message. + :type msg: str """ - Reply to a ping request. + ev_str, content_pickle = msg[0].split(b'\0', 1) # undo txzmq tagging + event = getattr(catalog, ev_str) + content = pickle.loads(content_pickle) + self._handle_event(event, content) - :param controller: used to mediate a single method call - :type controller: protobuf.socketrpc.controller.SocketRpcController - :param request: the request received from the client - :type request: leap.common.events.events_pb2.RegisterRequest - :param done: callback to be called when done - :type done: protobuf.socketrpc.server.Callback + def _subscribe(self, tag): """ - logger.debug("Received ping request, sending response.") - response = proto.EventResponse() - response.status = proto.EventResponse.OK - done.run(response) + Subscribe from a tag on the zmq SUB socket. + :param tag: The tag to be subscribed. + :type tag: str + """ + self._sub.socket.setsockopt(zmq.SUBSCRIBE, tag) -class EventsClientDaemon(daemon.EventsSingletonDaemon): - """ - A daemon that listens for incoming events from server. - """ + def _unsubscribe(self, tag): + """ + Unsubscribe from a tag on the zmq SUB socket. - @classmethod - def ensure(cls, port): + :param tag: The tag to be unsubscribed. + :type tag: str + """ + self._sub.socket.setsockopt(zmq.UNSUBSCRIBE, tag) + + def _send(self, data): """ - Make sure the daemon is running on the given port. + Send data through PUSH socket. - :param port: the port in which the daemon should listen - :type port: int + :param data: The data to be sent. + :type event: str + """ + # add send() as a callback for ioloop so it works between threads + self._loop.add_callback(lambda: self._push.send(data)) + + def _run_callback(self, callback, event, content): + """ + Run a callback. + + :param callback: The callback to be run. + :type callback: callable(event, *content) + :param event: The event to be sent. + :type event: Event + :param content: The content of the event. + :type content: list + """ + self._loop.add_callback(lambda: callback(event, *content)) - :return: a daemon instance - :rtype: EventsClientDaemon + def register(self, event, callback, uid=None, replace=False): + """ + Register a callback to be executed when an event is received. + + :param event: The event that triggers the callback. + :type event: Event + :param callback: The callback to be executed. + :type callback: callable(event, *content) + :param uid: The callback uid. + :type uid: str + :param replace: Wether an eventual callback with same ID should be + replaced. + :type replace: bool + + :return: The callback uid. + :rtype: str + + :raises CallbackAlreadyRegisteredError: when there's already a + callback identified by the given uid and replace is False. + """ + self.ensure_client() + return EventsClient.register( + self, event, callback, uid=uid, replace=replace) + + def unregister(self, event, uid=None): + """ + Unregister callbacks for an event. + + If uid is not None, then only the callback identified by the given uid + is removed. Otherwise, all callbacks for the event are removed. + + :param event: The event that triggers the callback. + :type event: Event + :param uid: The callback uid. + :type uid: str + """ + self.ensure_client() + EventsClient.unregister(self, event, uid=uid) + + def emit(self, event, *content): + """ + Send an event. + + :param event: The event to be sent. + :type event: Event + :param content: The content of the event. + :type content: list + """ + self.ensure_client() + EventsClient.emit(self, event, *content) + + def run(self): + """ + Run the events client. + """ + logger.debug("Starting ioloop.") + self._init_zmq() + self._initialized.set() + self._loop.start() + self._loop.close() + logger.debug("Ioloop finished.") + + def ensure_client(self): + """ + Make sure the events client thread is started. """ - return cls.ensure_service(port, EventsClientService()) + with self._lock: + if flags.EVENTS_ENABLED and not self.is_alive(): + self.daemon = True + self.start() + self._initialized.wait() + + def shutdown(self): + """ + Shutdown the events client thread. + """ + logger.debug("Shutting down client...") + with self._lock: + if self.is_alive(): + self._loop.stop(wait=True) + EventsClient.shutdown(self) + + +def shutdown(): + """ + Shutdown the events client thread. + """ + EventsClientThread.instance().shutdown() + + +def register(event, callback, uid=None, replace=False): + """ + Register a callback to be executed when an event is received. + + :param event: The event that triggers the callback. + :type event: str + :param callback: The callback to be executed. + :type callback: callable(event, content) + :param uid: The callback uid. + :type uid: str + :param replace: Wether an eventual callback with same ID should be + replaced. + :type replace: bool + + :return: The callback uid. + :rtype: str + + :raises CallbackAlreadyRegisteredError: when there's already a callback + identified by the given uid and replace is False. + """ + if flags.EVENTS_ENABLED: + return EventsClientThread.instance().register( + event, callback, uid=uid, replace=replace) + + +def unregister(event, uid=None): + """ + Unregister callbacks for an event. + + If uid is not None, then only the callback identified by the given uid is + removed. Otherwise, all callbacks for the event are removed. + + :param event: The event that triggers the callback. + :type event: str + :param uid: The callback uid. + :type uid: str + """ + if flags.EVENTS_ENABLED: + return EventsClientThread.instance().unregister(event, uid=uid) + + +def emit(event, *content): + """ + Send an event. + + :param event: The event to be sent. + :type event: str + :param content: The content of the event. + :type content: list + """ + if flags.EVENTS_ENABLED: + return EventsClientThread.instance().emit(event, *content) + + +def instance(): + """ + Return an instance of the events client. + + :return: An instance of the events client. + :rtype: EventsClientThread + """ + return EventsClientThread.instance() diff --git a/src/leap/common/events/daemon.py b/src/leap/common/events/daemon.py deleted file mode 100644 index c4a4189..0000000 --- a/src/leap/common/events/daemon.py +++ /dev/null @@ -1,208 +0,0 @@ -# -*- coding: utf-8 -*- -# daemon.py -# Copyright (C) 2013 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -""" -A singleton daemon for running RPC services using protobuf.socketrpc. -""" - - -import logging -import threading - - -from protobuf.socketrpc.server import ( - SocketRpcServer, - ThreadedTCPServer, - SocketHandler, -) - - -logger = logging.getLogger(__name__) - - -class ServiceAlreadyRunningException(Exception): - """ - Raised whenever a service is already running in this process but someone - attemped to start it in a different port. - """ - - -class EventsRpcServer(SocketRpcServer): - """ - RPC server used in server and client interfaces to receive messages. - """ - - def __init__(self, port, host='localhost'): - """ - Initialize a RPC server. - - :param port: the port in which to listen for incoming messages - :type port: int - :param host: the address to bind to - :type host: str - """ - SocketRpcServer.__init__(self, port, host) - self._server = None - - def run(self): - """ - Run the server. - """ - logger.info('Running server on port %d.' % self.port) - # parent implementation does not hold the server instance, so we do it - # here. - self._server = ThreadedTCPServer((self.host, self.port), - SocketHandler, self) - # if we chose to use a random port, fetch the port number info. - if self.port is 0: - self.port = self._server.socket.getsockname()[1] - self._server.serve_forever() - - def stop(self): - """ - Stop the server. - """ - self._server.shutdown() - - -class EventsSingletonDaemon(threading.Thread): - """ - Singleton class for for launching and terminating a daemon. - - This class is used so every part of the mechanism that needs to listen for - messages can launch its own daemon (thread) to do the job. - """ - - # Singleton instance - __instance = None - - def __new__(cls, *args, **kwargs): - """ - Return a singleton instance if it exists or create and initialize one. - """ - if len(args) is not 2: - raise TypeError("__init__() takes exactly 2 arguments (%d given)" - % len(args)) - if cls.__instance is None: - cls.__instance = object.__new__( - EventsSingletonDaemon) - cls.__initialize(cls.__instance, args[0], args[1]) - return cls.__instance - - @staticmethod - def __initialize(self, port, service): - """ - Initialize a singleton daemon. - - This is a static method disguised as instance method that actually - does the initialization of the daemon instance. - - :param port: the port in which to listen for incoming messages - :type port: int - :param service: the service to provide in this daemon - :type service: google.protobuf.service.Service - """ - threading.Thread.__init__(self) - self._port = port - self._service = service - self._server = EventsRpcServer(self._port) - self._server.registerService(self._service) - self.daemon = True - - def __init__(self): - """ - Singleton placeholder initialization method. - - Initialization is made in __new__ so we can always return the same - instance upon object creation. - """ - pass - - @classmethod - def ensure(cls, port): - """ - Make sure the daemon instance is running. - - Each implementation of this method should call `self.ensure_service` - with the appropriate service from the `events.proto` definitions, and - return the daemon instance. - - :param port: the port in which the daemon should be listening - :type port: int - - :return: a daemon instance - :rtype: EventsSingletonDaemon - """ - raise NotImplementedError(self.ensure) - - @classmethod - def ensure_service(cls, port, service): - """ - Start the singleton instance if not already running. - - Might return ServiceAlreadyRunningException - - :param port: the port in which the daemon should be listening - :type port: int - - :return: a daemon instance - :rtype: EventsSingletonDaemon - """ - daemon = cls(port, service) - if not daemon.is_alive(): - daemon.start() - elif port and port != cls.__instance._port: - # service is running in this process but someone is trying to - # start it in another port - raise ServiceAlreadyRunningException( - "Service is already running in this process on port %d." - % self.__instance._port) - return daemon - - @classmethod - def get_instance(cls): - """ - Retrieve singleton instance of this daemon. - - :return: a daemon instance - :rtype: EventsSingletonDaemon - """ - return cls.__instance - - def run(self): - """ - Run the server. - """ - self._server.run() - - def stop(self): - """ - Stop the daemon. - """ - self._server.stop() - - def get_port(self): - """ - Retrieve the value of the port to which the service running in this - daemon is binded to. - - :return: the port to which the daemon is binded to - :rtype: int - """ - if self._port is 0: - self._port = self._server.port - return self._port diff --git a/src/leap/common/events/mac_auth.py b/src/leap/common/events/errors.py index 49d48f7..58e0014 100644 --- a/src/leap/common/events/mac_auth.py +++ b/src/leap/common/events/errors.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# mac_auth.py -# Copyright (C) 2013 LEAP +# errors.py +# Copyright (C) 2015 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -15,17 +15,9 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -""" -Authentication system for events. -This is not implemented yet. -""" - - -class MacMethod(object): +class CallbackAlreadyRegisteredError(Exception): """ - Representation of possible MAC authentication methods. + Raised when trying to register an already registered callback. """ - - MAC_NONE = 'none' - MAC_HMAC = 'hmac' + pass diff --git a/src/leap/common/events/events.proto b/src/leap/common/events/events.proto deleted file mode 100644 index 2371b2a..0000000 --- a/src/leap/common/events/events.proto +++ /dev/null @@ -1,145 +0,0 @@ -// signal.proto -// Copyright (C) 2013 LEAP -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see <http://www.gnu.org/licenses/>. - -package leap.common.events; -option py_generic_services = true; - - -// These are the events that can be signaled using the events mechanism. - -enum Event { - CLIENT_SESSION_ID = 1; - CLIENT_UID = 2; - SOLEDAD_CREATING_KEYS = 3; - SOLEDAD_DONE_CREATING_KEYS = 4; - SOLEDAD_UPLOADING_KEYS = 5; - SOLEDAD_DONE_UPLOADING_KEYS = 6; - SOLEDAD_DOWNLOADING_KEYS = 7; - SOLEDAD_DONE_DOWNLOADING_KEYS = 8; - SOLEDAD_NEW_DATA_TO_SYNC = 9; - SOLEDAD_DONE_DATA_SYNC = 10; - UPDATER_NEW_UPDATES = 11; - UPDATER_DONE_UPDATING = 12; - RAISE_WINDOW = 13; - SMTP_SERVICE_STARTED = 14; - SMTP_SERVICE_FAILED_TO_START = 15; - SMTP_RECIPIENT_ACCEPTED_ENCRYPTED = 16; - SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED = 17; - SMTP_RECIPIENT_REJECTED = 18; - SMTP_START_ENCRYPT_AND_SIGN = 19; - SMTP_END_ENCRYPT_AND_SIGN = 20; - SMTP_START_SIGN = 21; - SMTP_END_SIGN = 22; - SMTP_SEND_MESSAGE_START = 23; - SMTP_SEND_MESSAGE_SUCCESS = 24; - SMTP_SEND_MESSAGE_ERROR = 25; - SMTP_CONNECTION_LOST = 26; - IMAP_SERVICE_STARTED = 30; - IMAP_SERVICE_FAILED_TO_START = 31; - IMAP_CLIENT_LOGIN = 32; - IMAP_FETCHED_INCOMING = 33; - IMAP_MSG_PROCESSING = 34; - IMAP_MSG_DECRYPTED = 35; - IMAP_MSG_SAVED_LOCALLY = 36; - IMAP_MSG_DELETED_INCOMING = 37; - IMAP_UNHANDLED_ERROR = 38; - IMAP_UNREAD_MAIL = 39; - KEYMANAGER_LOOKING_FOR_KEY = 40; - KEYMANAGER_KEY_FOUND = 41; - KEYMANAGER_KEY_NOT_FOUND = 42; - KEYMANAGER_STARTED_KEY_GENERATION = 43; - KEYMANAGER_FINISHED_KEY_GENERATION = 44; - KEYMANAGER_DONE_UPLOADING_KEYS = 45; - SOLEDAD_INVALID_AUTH_TOKEN = 46; - SOLEDAD_SYNC_SEND_STATUS = 47; - SOLEDAD_SYNC_RECEIVE_STATUS = 48; -} - - -// A SignalRequest is the type of the message sent from one component to request -// that a signal be sent to every registered component. - -message SignalRequest { - required Event event = 1; - required string content = 2; - required string mac_method = 3; - required bytes mac = 4; - optional string enc_method = 5; - optional bool error_occurred = 6; -} - - -// A RegisterRequest message tells the server that a component wants to -// be signaled whenever a specific event occurs. - -message RegisterRequest { - required Event event = 1; - required int32 port = 2; - required string mac_method = 3; - required bytes mac = 4; -} - - -// An UnregisterRequest message tells the server that a component does not -// want to be signaled when a specific event occurs. - -message UnregisterRequest { - required Event event = 1; - required int32 port = 2; - required string mac_method = 3; - required bytes mac = 4; -} - - -// A PingRequest message is used to find out if a server or component is -// alive. - -message PingRequest { -} - - -// The EventResponse is the message sent back by server and components after -// they receive other kinds of requests. - -message EventResponse { - - enum Status { - OK = 1; - UNAUTH = 2; - ERROR = 3; - } - - required Status status = 1; - optional string result = 2; -} - - -// The EventsServerService is the service provided by the server. - -service EventsServerService { - rpc ping(PingRequest) returns (EventResponse); - rpc register(RegisterRequest) returns (EventResponse); - rpc unregister(UnregisterRequest) returns (EventResponse); - rpc signal(SignalRequest) returns (EventResponse); -} - - -// EventsComponentService is the service provided by components (clients). - -service EventsClientService { - rpc ping(PingRequest) returns (EventResponse); - rpc signal(SignalRequest) returns (EventResponse); -} diff --git a/src/leap/common/events/events_pb2.py b/src/leap/common/events/events_pb2.py deleted file mode 100644 index 2bf568f..0000000 --- a/src/leap/common/events/events_pb2.py +++ /dev/null @@ -1,635 +0,0 @@ -# Generated by the protocol buffer compiler. DO NOT EDIT! - -from google.protobuf import descriptor -from google.protobuf import message -from google.protobuf import reflection -from google.protobuf import service -from google.protobuf import service_reflection -from google.protobuf import descriptor_pb2 -# @@protoc_insertion_point(imports) - - - -DESCRIPTOR = descriptor.FileDescriptor( - name='events.proto', - package='leap.common.events', - serialized_pb='\n\x0c\x65vents.proto\x12\x12leap.common.events\"\x97\x01\n\rSignalRequest\x12(\n\x05\x65vent\x18\x01 \x02(\x0e\x32\x19.leap.common.events.Event\x12\x0f\n\x07\x63ontent\x18\x02 \x02(\t\x12\x12\n\nmac_method\x18\x03 \x02(\t\x12\x0b\n\x03mac\x18\x04 \x02(\x0c\x12\x12\n\nenc_method\x18\x05 \x01(\t\x12\x16\n\x0e\x65rror_occurred\x18\x06 \x01(\x08\"j\n\x0fRegisterRequest\x12(\n\x05\x65vent\x18\x01 \x02(\x0e\x32\x19.leap.common.events.Event\x12\x0c\n\x04port\x18\x02 \x02(\x05\x12\x12\n\nmac_method\x18\x03 \x02(\t\x12\x0b\n\x03mac\x18\x04 \x02(\x0c\"l\n\x11UnregisterRequest\x12(\n\x05\x65vent\x18\x01 \x02(\x0e\x32\x19.leap.common.events.Event\x12\x0c\n\x04port\x18\x02 \x02(\x05\x12\x12\n\nmac_method\x18\x03 \x02(\t\x12\x0b\n\x03mac\x18\x04 \x02(\x0c\"\r\n\x0bPingRequest\"\x82\x01\n\rEventResponse\x12\x38\n\x06status\x18\x01 \x02(\x0e\x32(.leap.common.events.EventResponse.Status\x12\x0e\n\x06result\x18\x02 \x01(\t\"\'\n\x06Status\x12\x06\n\x02OK\x10\x01\x12\n\n\x06UNAUTH\x10\x02\x12\t\n\x05\x45RROR\x10\x03*\x9f\n\n\x05\x45vent\x12\x15\n\x11\x43LIENT_SESSION_ID\x10\x01\x12\x0e\n\nCLIENT_UID\x10\x02\x12\x19\n\x15SOLEDAD_CREATING_KEYS\x10\x03\x12\x1e\n\x1aSOLEDAD_DONE_CREATING_KEYS\x10\x04\x12\x1a\n\x16SOLEDAD_UPLOADING_KEYS\x10\x05\x12\x1f\n\x1bSOLEDAD_DONE_UPLOADING_KEYS\x10\x06\x12\x1c\n\x18SOLEDAD_DOWNLOADING_KEYS\x10\x07\x12!\n\x1dSOLEDAD_DONE_DOWNLOADING_KEYS\x10\x08\x12\x1c\n\x18SOLEDAD_NEW_DATA_TO_SYNC\x10\t\x12\x1a\n\x16SOLEDAD_DONE_DATA_SYNC\x10\n\x12\x17\n\x13UPDATER_NEW_UPDATES\x10\x0b\x12\x19\n\x15UPDATER_DONE_UPDATING\x10\x0c\x12\x10\n\x0cRAISE_WINDOW\x10\r\x12\x18\n\x14SMTP_SERVICE_STARTED\x10\x0e\x12 \n\x1cSMTP_SERVICE_FAILED_TO_START\x10\x0f\x12%\n!SMTP_RECIPIENT_ACCEPTED_ENCRYPTED\x10\x10\x12\'\n#SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED\x10\x11\x12\x1b\n\x17SMTP_RECIPIENT_REJECTED\x10\x12\x12\x1f\n\x1bSMTP_START_ENCRYPT_AND_SIGN\x10\x13\x12\x1d\n\x19SMTP_END_ENCRYPT_AND_SIGN\x10\x14\x12\x13\n\x0fSMTP_START_SIGN\x10\x15\x12\x11\n\rSMTP_END_SIGN\x10\x16\x12\x1b\n\x17SMTP_SEND_MESSAGE_START\x10\x17\x12\x1d\n\x19SMTP_SEND_MESSAGE_SUCCESS\x10\x18\x12\x1b\n\x17SMTP_SEND_MESSAGE_ERROR\x10\x19\x12\x18\n\x14SMTP_CONNECTION_LOST\x10\x1a\x12\x18\n\x14IMAP_SERVICE_STARTED\x10\x1e\x12 \n\x1cIMAP_SERVICE_FAILED_TO_START\x10\x1f\x12\x15\n\x11IMAP_CLIENT_LOGIN\x10 \x12\x19\n\x15IMAP_FETCHED_INCOMING\x10!\x12\x17\n\x13IMAP_MSG_PROCESSING\x10\"\x12\x16\n\x12IMAP_MSG_DECRYPTED\x10#\x12\x1a\n\x16IMAP_MSG_SAVED_LOCALLY\x10$\x12\x1d\n\x19IMAP_MSG_DELETED_INCOMING\x10%\x12\x18\n\x14IMAP_UNHANDLED_ERROR\x10&\x12\x14\n\x10IMAP_UNREAD_MAIL\x10\'\x12\x1e\n\x1aKEYMANAGER_LOOKING_FOR_KEY\x10(\x12\x18\n\x14KEYMANAGER_KEY_FOUND\x10)\x12\x1c\n\x18KEYMANAGER_KEY_NOT_FOUND\x10*\x12%\n!KEYMANAGER_STARTED_KEY_GENERATION\x10+\x12&\n\"KEYMANAGER_FINISHED_KEY_GENERATION\x10,\x12\"\n\x1eKEYMANAGER_DONE_UPLOADING_KEYS\x10-\x12\x1e\n\x1aSOLEDAD_INVALID_AUTH_TOKEN\x10.\x12\x1c\n\x18SOLEDAD_SYNC_SEND_STATUS\x10/\x12\x1f\n\x1bSOLEDAD_SYNC_RECEIVE_STATUS\x10\x30\x32\xdd\x02\n\x13\x45ventsServerService\x12J\n\x04ping\x12\x1f.leap.common.events.PingRequest\x1a!.leap.common.events.EventResponse\x12R\n\x08register\x12#.leap.common.events.RegisterRequest\x1a!.leap.common.events.EventResponse\x12V\n\nunregister\x12%.leap.common.events.UnregisterRequest\x1a!.leap.common.events.EventResponse\x12N\n\x06signal\x12!.leap.common.events.SignalRequest\x1a!.leap.common.events.EventResponse2\xb1\x01\n\x13\x45ventsClientService\x12J\n\x04ping\x12\x1f.leap.common.events.PingRequest\x1a!.leap.common.events.EventResponse\x12N\n\x06signal\x12!.leap.common.events.SignalRequest\x1a!.leap.common.events.EventResponseB\x03\x90\x01\x01') - -_EVENT = descriptor.EnumDescriptor( - name='Event', - full_name='leap.common.events.Event', - filename=None, - file=DESCRIPTOR, - values=[ - descriptor.EnumValueDescriptor( - name='CLIENT_SESSION_ID', index=0, number=1, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='CLIENT_UID', index=1, number=2, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_CREATING_KEYS', index=2, number=3, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_DONE_CREATING_KEYS', index=3, number=4, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_UPLOADING_KEYS', index=4, number=5, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_DONE_UPLOADING_KEYS', index=5, number=6, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_DOWNLOADING_KEYS', index=6, number=7, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_DONE_DOWNLOADING_KEYS', index=7, number=8, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_NEW_DATA_TO_SYNC', index=8, number=9, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_DONE_DATA_SYNC', index=9, number=10, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='UPDATER_NEW_UPDATES', index=10, number=11, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='UPDATER_DONE_UPDATING', index=11, number=12, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='RAISE_WINDOW', index=12, number=13, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_SERVICE_STARTED', index=13, number=14, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_SERVICE_FAILED_TO_START', index=14, number=15, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_RECIPIENT_ACCEPTED_ENCRYPTED', index=15, number=16, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED', index=16, number=17, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_RECIPIENT_REJECTED', index=17, number=18, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_START_ENCRYPT_AND_SIGN', index=18, number=19, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_END_ENCRYPT_AND_SIGN', index=19, number=20, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_START_SIGN', index=20, number=21, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_END_SIGN', index=21, number=22, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_SEND_MESSAGE_START', index=22, number=23, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_SEND_MESSAGE_SUCCESS', index=23, number=24, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_SEND_MESSAGE_ERROR', index=24, number=25, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SMTP_CONNECTION_LOST', index=25, number=26, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='IMAP_SERVICE_STARTED', index=26, number=30, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='IMAP_SERVICE_FAILED_TO_START', index=27, number=31, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='IMAP_CLIENT_LOGIN', index=28, number=32, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='IMAP_FETCHED_INCOMING', index=29, number=33, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='IMAP_MSG_PROCESSING', index=30, number=34, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='IMAP_MSG_DECRYPTED', index=31, number=35, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='IMAP_MSG_SAVED_LOCALLY', index=32, number=36, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='IMAP_MSG_DELETED_INCOMING', index=33, number=37, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='IMAP_UNHANDLED_ERROR', index=34, number=38, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='IMAP_UNREAD_MAIL', index=35, number=39, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='KEYMANAGER_LOOKING_FOR_KEY', index=36, number=40, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='KEYMANAGER_KEY_FOUND', index=37, number=41, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='KEYMANAGER_KEY_NOT_FOUND', index=38, number=42, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='KEYMANAGER_STARTED_KEY_GENERATION', index=39, number=43, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='KEYMANAGER_FINISHED_KEY_GENERATION', index=40, number=44, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='KEYMANAGER_DONE_UPLOADING_KEYS', index=41, number=45, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_INVALID_AUTH_TOKEN', index=42, number=46, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_SYNC_SEND_STATUS', index=43, number=47, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SOLEDAD_SYNC_RECEIVE_STATUS', index=44, number=48, - options=None, - type=None), - ], - containing_type=None, - options=None, - serialized_start=557, - serialized_end=1868, -) - - -CLIENT_SESSION_ID = 1 -CLIENT_UID = 2 -SOLEDAD_CREATING_KEYS = 3 -SOLEDAD_DONE_CREATING_KEYS = 4 -SOLEDAD_UPLOADING_KEYS = 5 -SOLEDAD_DONE_UPLOADING_KEYS = 6 -SOLEDAD_DOWNLOADING_KEYS = 7 -SOLEDAD_DONE_DOWNLOADING_KEYS = 8 -SOLEDAD_NEW_DATA_TO_SYNC = 9 -SOLEDAD_DONE_DATA_SYNC = 10 -UPDATER_NEW_UPDATES = 11 -UPDATER_DONE_UPDATING = 12 -RAISE_WINDOW = 13 -SMTP_SERVICE_STARTED = 14 -SMTP_SERVICE_FAILED_TO_START = 15 -SMTP_RECIPIENT_ACCEPTED_ENCRYPTED = 16 -SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED = 17 -SMTP_RECIPIENT_REJECTED = 18 -SMTP_START_ENCRYPT_AND_SIGN = 19 -SMTP_END_ENCRYPT_AND_SIGN = 20 -SMTP_START_SIGN = 21 -SMTP_END_SIGN = 22 -SMTP_SEND_MESSAGE_START = 23 -SMTP_SEND_MESSAGE_SUCCESS = 24 -SMTP_SEND_MESSAGE_ERROR = 25 -SMTP_CONNECTION_LOST = 26 -IMAP_SERVICE_STARTED = 30 -IMAP_SERVICE_FAILED_TO_START = 31 -IMAP_CLIENT_LOGIN = 32 -IMAP_FETCHED_INCOMING = 33 -IMAP_MSG_PROCESSING = 34 -IMAP_MSG_DECRYPTED = 35 -IMAP_MSG_SAVED_LOCALLY = 36 -IMAP_MSG_DELETED_INCOMING = 37 -IMAP_UNHANDLED_ERROR = 38 -IMAP_UNREAD_MAIL = 39 -KEYMANAGER_LOOKING_FOR_KEY = 40 -KEYMANAGER_KEY_FOUND = 41 -KEYMANAGER_KEY_NOT_FOUND = 42 -KEYMANAGER_STARTED_KEY_GENERATION = 43 -KEYMANAGER_FINISHED_KEY_GENERATION = 44 -KEYMANAGER_DONE_UPLOADING_KEYS = 45 -SOLEDAD_INVALID_AUTH_TOKEN = 46 -SOLEDAD_SYNC_SEND_STATUS = 47 -SOLEDAD_SYNC_RECEIVE_STATUS = 48 - - -_EVENTRESPONSE_STATUS = descriptor.EnumDescriptor( - name='Status', - full_name='leap.common.events.EventResponse.Status', - filename=None, - file=DESCRIPTOR, - values=[ - descriptor.EnumValueDescriptor( - name='OK', index=0, number=1, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='UNAUTH', index=1, number=2, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='ERROR', index=2, number=3, - options=None, - type=None), - ], - containing_type=None, - options=None, - serialized_start=515, - serialized_end=554, -) - - -_SIGNALREQUEST = descriptor.Descriptor( - name='SignalRequest', - full_name='leap.common.events.SignalRequest', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - descriptor.FieldDescriptor( - name='event', full_name='leap.common.events.SignalRequest.event', index=0, - number=1, type=14, cpp_type=8, label=2, - has_default_value=False, default_value=1, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='content', full_name='leap.common.events.SignalRequest.content', index=1, - number=2, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='mac_method', full_name='leap.common.events.SignalRequest.mac_method', index=2, - number=3, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='mac', full_name='leap.common.events.SignalRequest.mac', index=3, - number=4, type=12, cpp_type=9, label=2, - has_default_value=False, default_value="", - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='enc_method', full_name='leap.common.events.SignalRequest.enc_method', index=4, - number=5, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='error_occurred', full_name='leap.common.events.SignalRequest.error_occurred', index=5, - number=6, type=8, cpp_type=7, label=1, - has_default_value=False, default_value=False, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=37, - serialized_end=188, -) - - -_REGISTERREQUEST = descriptor.Descriptor( - name='RegisterRequest', - full_name='leap.common.events.RegisterRequest', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - descriptor.FieldDescriptor( - name='event', full_name='leap.common.events.RegisterRequest.event', index=0, - number=1, type=14, cpp_type=8, label=2, - has_default_value=False, default_value=1, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='port', full_name='leap.common.events.RegisterRequest.port', index=1, - number=2, type=5, cpp_type=1, label=2, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='mac_method', full_name='leap.common.events.RegisterRequest.mac_method', index=2, - number=3, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='mac', full_name='leap.common.events.RegisterRequest.mac', index=3, - number=4, type=12, cpp_type=9, label=2, - has_default_value=False, default_value="", - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=190, - serialized_end=296, -) - - -_UNREGISTERREQUEST = descriptor.Descriptor( - name='UnregisterRequest', - full_name='leap.common.events.UnregisterRequest', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - descriptor.FieldDescriptor( - name='event', full_name='leap.common.events.UnregisterRequest.event', index=0, - number=1, type=14, cpp_type=8, label=2, - has_default_value=False, default_value=1, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='port', full_name='leap.common.events.UnregisterRequest.port', index=1, - number=2, type=5, cpp_type=1, label=2, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='mac_method', full_name='leap.common.events.UnregisterRequest.mac_method', index=2, - number=3, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='mac', full_name='leap.common.events.UnregisterRequest.mac', index=3, - number=4, type=12, cpp_type=9, label=2, - has_default_value=False, default_value="", - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=298, - serialized_end=406, -) - - -_PINGREQUEST = descriptor.Descriptor( - name='PingRequest', - full_name='leap.common.events.PingRequest', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=408, - serialized_end=421, -) - - -_EVENTRESPONSE = descriptor.Descriptor( - name='EventResponse', - full_name='leap.common.events.EventResponse', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - descriptor.FieldDescriptor( - name='status', full_name='leap.common.events.EventResponse.status', index=0, - number=1, type=14, cpp_type=8, label=2, - has_default_value=False, default_value=1, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='result', full_name='leap.common.events.EventResponse.result', index=1, - number=2, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - _EVENTRESPONSE_STATUS, - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=424, - serialized_end=554, -) - -_SIGNALREQUEST.fields_by_name['event'].enum_type = _EVENT -_REGISTERREQUEST.fields_by_name['event'].enum_type = _EVENT -_UNREGISTERREQUEST.fields_by_name['event'].enum_type = _EVENT -_EVENTRESPONSE.fields_by_name['status'].enum_type = _EVENTRESPONSE_STATUS -_EVENTRESPONSE_STATUS.containing_type = _EVENTRESPONSE; -DESCRIPTOR.message_types_by_name['SignalRequest'] = _SIGNALREQUEST -DESCRIPTOR.message_types_by_name['RegisterRequest'] = _REGISTERREQUEST -DESCRIPTOR.message_types_by_name['UnregisterRequest'] = _UNREGISTERREQUEST -DESCRIPTOR.message_types_by_name['PingRequest'] = _PINGREQUEST -DESCRIPTOR.message_types_by_name['EventResponse'] = _EVENTRESPONSE - -class SignalRequest(message.Message): - __metaclass__ = reflection.GeneratedProtocolMessageType - DESCRIPTOR = _SIGNALREQUEST - - # @@protoc_insertion_point(class_scope:leap.common.events.SignalRequest) - -class RegisterRequest(message.Message): - __metaclass__ = reflection.GeneratedProtocolMessageType - DESCRIPTOR = _REGISTERREQUEST - - # @@protoc_insertion_point(class_scope:leap.common.events.RegisterRequest) - -class UnregisterRequest(message.Message): - __metaclass__ = reflection.GeneratedProtocolMessageType - DESCRIPTOR = _UNREGISTERREQUEST - - # @@protoc_insertion_point(class_scope:leap.common.events.UnregisterRequest) - -class PingRequest(message.Message): - __metaclass__ = reflection.GeneratedProtocolMessageType - DESCRIPTOR = _PINGREQUEST - - # @@protoc_insertion_point(class_scope:leap.common.events.PingRequest) - -class EventResponse(message.Message): - __metaclass__ = reflection.GeneratedProtocolMessageType - DESCRIPTOR = _EVENTRESPONSE - - # @@protoc_insertion_point(class_scope:leap.common.events.EventResponse) - - -_EVENTSSERVERSERVICE = descriptor.ServiceDescriptor( - name='EventsServerService', - full_name='leap.common.events.EventsServerService', - file=DESCRIPTOR, - index=0, - options=None, - serialized_start=1871, - serialized_end=2220, - methods=[ - descriptor.MethodDescriptor( - name='ping', - full_name='leap.common.events.EventsServerService.ping', - index=0, - containing_service=None, - input_type=_PINGREQUEST, - output_type=_EVENTRESPONSE, - options=None, - ), - descriptor.MethodDescriptor( - name='register', - full_name='leap.common.events.EventsServerService.register', - index=1, - containing_service=None, - input_type=_REGISTERREQUEST, - output_type=_EVENTRESPONSE, - options=None, - ), - descriptor.MethodDescriptor( - name='unregister', - full_name='leap.common.events.EventsServerService.unregister', - index=2, - containing_service=None, - input_type=_UNREGISTERREQUEST, - output_type=_EVENTRESPONSE, - options=None, - ), - descriptor.MethodDescriptor( - name='signal', - full_name='leap.common.events.EventsServerService.signal', - index=3, - containing_service=None, - input_type=_SIGNALREQUEST, - output_type=_EVENTRESPONSE, - options=None, - ), -]) - -class EventsServerService(service.Service): - __metaclass__ = service_reflection.GeneratedServiceType - DESCRIPTOR = _EVENTSSERVERSERVICE -class EventsServerService_Stub(EventsServerService): - __metaclass__ = service_reflection.GeneratedServiceStubType - DESCRIPTOR = _EVENTSSERVERSERVICE - - -_EVENTSCLIENTSERVICE = descriptor.ServiceDescriptor( - name='EventsClientService', - full_name='leap.common.events.EventsClientService', - file=DESCRIPTOR, - index=1, - options=None, - serialized_start=2223, - serialized_end=2400, - methods=[ - descriptor.MethodDescriptor( - name='ping', - full_name='leap.common.events.EventsClientService.ping', - index=0, - containing_service=None, - input_type=_PINGREQUEST, - output_type=_EVENTRESPONSE, - options=None, - ), - descriptor.MethodDescriptor( - name='signal', - full_name='leap.common.events.EventsClientService.signal', - index=1, - containing_service=None, - input_type=_SIGNALREQUEST, - output_type=_EVENTRESPONSE, - options=None, - ), -]) - -class EventsClientService(service.Service): - __metaclass__ = service_reflection.GeneratedServiceType - DESCRIPTOR = _EVENTSCLIENTSERVICE -class EventsClientService_Stub(EventsClientService): - __metaclass__ = service_reflection.GeneratedServiceStubType - DESCRIPTOR = _EVENTSCLIENTSERVICE - -# @@protoc_insertion_point(module_scope) diff --git a/src/leap/common/events/Makefile b/src/leap/common/events/flags.py index 5b7e60d..137f663 100644 --- a/src/leap/common/events/Makefile +++ b/src/leap/common/events/flags.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# Makefile -# Copyright (C) 2013 LEAP +# __init__.py +# Copyright (C) 2015 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -14,18 +14,15 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Flags for the events framework. +""" +from leap.common.check import leap_assert -# This file is used to generate protobuf python files that are used for IPC: -# -# https://developers.google.com/protocol-buffers/docs/pythontutorial - -PROTOC = protoc - -all: events_pb2.py +EVENTS_ENABLED = True -%_pb2.py: %.proto - $(PROTOC) --python_out=./ $< -# autopep8 --in-place --aggressive $@ -clean: - rm -f *_pb2.py +def set_events_enabled(flag): + leap_assert(isinstance(flag, bool)) + global EVENTS_ENABLED + EVENTS_ENABLED = flag diff --git a/src/leap/common/events/server.py b/src/leap/common/events/server.py index 41aede3..a69202e 100644 --- a/src/leap/common/events/server.py +++ b/src/leap/common/events/server.py @@ -14,223 +14,79 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -""" -A server for the events mechanism. - -A server can receive different kinds of requests from clients: - 1. Registration request: store client port number to be notified when - a specific signal arrives. - 2. Signal request: redistribute the signal to registered clients. """ -import logging -import socket +The server for the events mechanism. +""" -from protobuf.socketrpc import RpcService -from leap.common.events import ( - events_pb2 as proto, - daemon, -) +import logging +import txzmq +from leap.common.zmq_utils import zmq_has_curve -logger = logging.getLogger(__name__) +from leap.common.events.zmq_components import TxZmqServerComponent -SERVER_PORT = 8090 +if zmq_has_curve(): + EMIT_ADDR = "tcp://127.0.0.1:9000" + REG_ADDR = "tcp://127.0.0.1:9001" +else: + EMIT_ADDR = "ipc:///tmp/leap.common.events.socket.0" + REG_ADDR = "ipc:///tmp/leap.common.events.socket.1" -# the `registered_clients` dictionary below should have the following -# format: -# -# { event_signal: [ port, ... ], ... } -# -registered_clients = {} - -class PortAlreadyTaken(Exception): - """ - Raised when trying to open a server in a port that is already taken. - """ - pass +logger = logging.getLogger(__name__) -def ensure_server(port=SERVER_PORT): +def ensure_server(emit_addr=EMIT_ADDR, reg_addr=REG_ADDR): """ - Make sure the server is running on the given port. + Make sure the server is running in the given addresses. - Attempt to connect to given local port. Upon success, assume that the - events server has already been started. Upon failure, start events server. + :param emit_addr: The address in which to receive events from clients. + :type emit_addr: str + :param reg_addr: The address to which publish events to clients. + :type reg_addr: str - :param port: the port in which server should be listening - :type port: int - - :return: the daemon instance or nothing - :rtype: EventsServerDaemon or None - - :raise PortAlreadyTaken: Raised if C{port} is already taken by something - that is not an events server. - """ - try: - # check if port is available - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.connect(('localhost', port)) - s.close() - # port is taken, check if there's a server running there - response = ping(port=port, timeout=1000) - if response is not None and response.status == proto.EventResponse.OK: - logger.info('A server is already running on port %d.', port) - return - # port is taken, and not by an events server - logger.warning( - 'Port %d is taken by something not an events server.', port) - raise PortAlreadyTaken(port) - except socket.error: - # port is available, run a server - logger.info('Launching server on port %d.', port) - return EventsServerDaemon.ensure(port) - - -def ping(port=SERVER_PORT, reqcbk=None, timeout=1000): + :return: an events server instance + :rtype: EventsServer """ - Ping the server. - - :param port: the port in which server should be listening - :type port: int - :param reqcbk: a callback to be called when a response from server is - received - :type reqcbk: function(proto.PingRequest, proto.EventResponse) - :param timeout: the timeout for synch calls - :type timeout: int - - :return: the response from server for synch calls or nothing for asynch - calls. - :rtype: leap.common.events.events_pb2.EventsResponse or None - """ - request = proto.PingRequest() - service = RpcService( - proto.EventsServerService_Stub, - port, - 'localhost') - logger.debug("Pinging server in port %d..." % port) - return service.ping(request, callback=reqcbk, timeout=timeout) + _server = EventsServer(emit_addr, reg_addr) + return _server -class EventsServerService(proto.EventsServerService): +class EventsServer(TxZmqServerComponent): """ - Service for receiving events in clients. + An events server that listens for events in one address and publishes those + events in another address. """ - def register(self, controller, request, done): - """ - Register a client port to be signaled when specific events come in. - - :param controller: used to mediate a single method call - :type controller: protobuf.socketrpc.controller.SocketRpcController - :param request: the request received from the client - :type request: leap.common.events.events_pb2.RegisterRequest - :param done: callback to be called when done - :type done: protobuf.socketrpc.server.Callback - """ - logger.info("Received registration request: %s..." % str(request)[:40]) - # add client port to signal list - if request.event not in registered_clients: - registered_clients[request.event] = set([]) - registered_clients[request.event].add(request.port) - # send response back to client - - logger.debug('sending response back') - response = proto.EventResponse() - response.status = proto.EventResponse.OK - done.run(response) - - def unregister(self, controller, request, done): - """ - Unregister a client port so it will not be signaled when specific - events come in. - - :param controller: used to mediate a single method call - :type controller: protobuf.socketrpc.controller.SocketRpcController - :param request: the request received from the client - :type request: leap.common.events.events_pb2.RegisterRequest - :param done: callback to be called when done - :type done: protobuf.socketrpc.server.Callback - """ - logger.info( - "Received unregistration request: %s..." % str(request)[:40]) - # remove client port from signal list - response = proto.EventResponse() - if request.event in registered_clients: - try: - registered_clients[request.event].remove(request.port) - response.status = proto.EventResponse.OK - except KeyError: - response.status = proto.EventsResponse.ERROR - response.result = 'Port %d not registered.' % request.port - # send response back to client - logger.debug('sending response back') - done.run(response) - - def signal(self, controller, request, done): - """ - Perform an RPC call to signal all clients registered to receive a - specific signal. - - :param controller: used to mediate a single method call - :type controller: protobuf.socketrpc.controller.SocketRpcController - :param request: the request received from the client - :type request: leap.common.events.events_pb2.SignalRequest - :param done: callback to be called when done - :type done: protobuf.socketrpc.server.Callback - """ - logger.debug('Received signal from client: %s...', str(request)[:40]) - # send signal to all registered clients - # TODO: verify signal auth - if request.event in registered_clients: - for port in registered_clients[request.event]: - - def callback(req, resp): - logger.debug("Signal received by " + str(port)) - - service = RpcService(proto.EventsClientService_Stub, - port, 'localhost') - service.signal(request, callback=callback) - # send response back to client - response = proto.EventResponse() - response.status = proto.EventResponse.OK - done.run(response) - - def ping(self, controller, request, done): + def __init__(self, emit_addr, reg_addr): """ - Reply to a ping request. - - :param controller: used to mediate a single method call - :type controller: protobuf.socketrpc.controller.SocketRpcController - :param request: the request received from the client - :type request: leap.common.events.events_pb2.RegisterRequest - :param done: callback to be called when done - :type done: protobuf.socketrpc.server.Callback - """ - logger.debug("Received ping request, sending response.") - response = proto.EventResponse() - response.status = proto.EventResponse.OK - done.run(response) - + Initialize the events server. -class EventsServerDaemon(daemon.EventsSingletonDaemon): - """ - Singleton class for starting an events server daemon. - """ - - @classmethod - def ensure(cls, port): + :param emit_addr: The address in which to receive events from clients. + :type emit_addr: str + :param reg_addr: The address to which publish events to clients. + :type reg_addr: str """ - Make sure the daemon is running on the given port. - - :param port: the port in which the daemon should listen - :type port: int + TxZmqServerComponent.__init__(self) + # bind PULL and PUB sockets + self._pull, self.pull_port = self._zmq_bind( + txzmq.ZmqPullConnection, emit_addr) + self._pub, self.pub_port = self._zmq_bind( + txzmq.ZmqPubConnection, reg_addr) + # set a handler for arriving messages + self._pull.onPull = self._onPull + + def _onPull(self, message): + """ + Callback executed when a message is pulled from a client. - :return: a daemon instance - :rtype: EventsServerDaemon + :param message: The message sent by the client. + :type message: str """ - return cls.ensure_service(port, EventsServerService()) + event, content = message[0].split(b"\0", 1) + logger.debug("Publishing event: %s" % event) + self._pub.publish(content, tag=event) diff --git a/src/leap/common/events/tests/__init__.py b/src/leap/common/events/tests/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/leap/common/events/tests/__init__.py diff --git a/src/leap/common/events/tests/test_zmq_components.py b/src/leap/common/events/tests/test_zmq_components.py new file mode 100644 index 0000000..c51e37e --- /dev/null +++ b/src/leap/common/events/tests/test_zmq_components.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- +# test_zmq_components.py +# Copyright (C) 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for the zmq_components module. +""" +try: + import unittest2 as unittest +except ImportError: + import unittest + +from leap.common.events import zmq_components + + +class AddrParseTestCase(unittest.TestCase): + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_addr_parsing(self): + addr_re = zmq_components.ADDRESS_RE + + self.assertEqual( + addr_re.search("ipc:///tmp/foo/bar/baaz-2/foo.0").groups(), + ("ipc", "/tmp/foo/bar/baaz-2/foo.0", None)) + self.assertEqual( + addr_re.search("tcp://localhost:9000").groups(), + ("tcp", "localhost", "9000")) + self.assertEqual( + addr_re.search("tcp://127.0.0.1:9000").groups(), + ("tcp", "127.0.0.1", "9000")) + + +if __name__ == "__main__": + unittest.main() diff --git a/src/leap/common/events/txclient.py b/src/leap/common/events/txclient.py new file mode 100644 index 0000000..dfd0533 --- /dev/null +++ b/src/leap/common/events/txclient.py @@ -0,0 +1,194 @@ +# -*- coding: utf-8 -*- +# txclient.py +# Copyright (C) 2013, 2014, 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +The client end point of the events mechanism, implemented using txzmq. + +Clients are the communicating parties of the events mechanism. They +communicate by sending messages to a server, which in turn redistributes +messages to other clients. + +When a client registers a callback for a given event, it also tells the +server that it wants to be notified whenever events of that type are sent by +some other client. +""" +import logging +import pickle + +import txzmq + +from leap.common.events.zmq_components import TxZmqClientComponent +from leap.common.events.client import EventsClient +from leap.common.events.client import configure_client +from leap.common.events.server import EMIT_ADDR +from leap.common.events.server import REG_ADDR +from leap.common.events import catalog + + +logger = logging.getLogger(__name__) + + +__all__ = [ + "configure_client", + "EventsTxClient", + "register", + "unregister", + "emit", + "shutdown", +] + + +class EventsTxClient(TxZmqClientComponent, EventsClient): + """ + A twisted events client that listens for events in one address and + publishes those events to another address. + """ + + def __init__(self, emit_addr=EMIT_ADDR, reg_addr=REG_ADDR, + path_prefix=None): + """ + Initialize the events server. + """ + TxZmqClientComponent.__init__(self, path_prefix=path_prefix) + EventsClient.__init__(self, emit_addr, reg_addr) + # connect SUB first, otherwise we might miss some event sent from this + # same client + self._sub = self._zmq_connect(txzmq.ZmqSubConnection, reg_addr) + self._sub.gotMessage = self._gotMessage + self._push = self._zmq_connect(txzmq.ZmqPushConnection, emit_addr) + + def _gotMessage(self, msg, tag): + """ + Handle an incoming event. + + :param msg: The incoming message. + :type msg: list(str) + """ + event = getattr(catalog, tag) + content = pickle.loads(msg) + self._handle_event(event, content) + + def _subscribe(self, tag): + """ + Subscribe to a tag on the zmq SUB socket. + + :param tag: The tag to be subscribed. + :type tag: str + """ + self._sub.subscribe(tag) + + def _unsubscribe(self, tag): + """ + Unsubscribe from a tag on the zmq SUB socket. + + :param tag: The tag to be unsubscribed. + :type tag: str + """ + self._sub.unsubscribe(tag) + + def _send(self, data): + """ + Send data through PUSH socket. + + :param data: The data to be sent. + :type event: str + """ + self._push.send(data) + + def _run_callback(self, callback, event, content): + """ + Run a callback. + + :param callback: The callback to be run. + :type callback: callable(event, *content) + :param event: The event to be sent. + :type event: Event + :param content: The content of the event. + :type content: list + """ + callback(event, *content) + + def shutdown(self): + TxZmqClientComponent.shutdown(self) + EventsClient.shutdown(self) + + +def register(event, callback, uid=None, replace=False): + """ + Register a callback to be executed when an event is received. + + :param event: The event that triggers the callback. + :type event: str + :param callback: The callback to be executed. + :type callback: callable(event, content) + :param uid: The callback uid. + :type uid: str + :param replace: Wether an eventual callback with same ID should be + replaced. + :type replace: bool + + :return: The callback uid. + :rtype: str + + :raises CallbackAlreadyRegisteredError: when there's already a callback + identified by the given uid and replace is False. + """ + return EventsTxClient.instance().register( + event, callback, uid=uid, replace=replace) + + +def unregister(event, uid=None): + """ + Unregister callbacks for an event. + + If uid is not None, then only the callback identified by the given uid is + removed. Otherwise, all callbacks for the event are removed. + + :param event: The event that triggers the callback. + :type event: str + :param uid: The callback uid. + :type uid: str + """ + return EventsTxClient.instance().unregister(event, uid=uid) + + +def emit(event, *content): + """ + Send an event. + + :param event: The event to be sent. + :type event: str + :param content: The content of the event. + :type content: list + """ + return EventsTxClient.instance().emit(event, *content) + + +def shutdown(): + """ + Shutdown the events client. + """ + EventsTxClient.instance().shutdown() + + +def instance(): + """ + Return an instance of the events client. + + :return: An instance of the events client. + :rtype: EventsClientThread + """ + return EventsTxClient.instance() diff --git a/src/leap/common/events/zmq_components.py b/src/leap/common/events/zmq_components.py new file mode 100644 index 0000000..f99c754 --- /dev/null +++ b/src/leap/common/events/zmq_components.py @@ -0,0 +1,188 @@ +# -*- coding: utf-8 -*- +# zmq.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +""" +The server for the events mechanism. +""" + + +import os +import logging +import txzmq +import re + +from abc import ABCMeta + +# XXX some distros don't package libsodium, so we have to be prepared for +# absence of zmq.auth +try: + import zmq.auth + from zmq.auth.thread import ThreadAuthenticator +except ImportError: + pass + +from leap.common.config import get_path_prefix +from leap.common.zmq_utils import zmq_has_curve +from leap.common.zmq_utils import maybe_create_and_get_certificates +from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX + + +logger = logging.getLogger(__name__) + + +ADDRESS_RE = re.compile("^([a-z]+)://([^:]+):?(\d+)?$") + + +class TxZmqComponent(object): + """ + A twisted-powered zmq events component. + """ + + __metaclass__ = ABCMeta + + _component_type = None + + def __init__(self, path_prefix=None): + """ + Initialize the txzmq component. + """ + self._factory = txzmq.ZmqFactory() + self._factory.registerForShutdown() + if path_prefix is None: + path_prefix = get_path_prefix() + self._config_prefix = os.path.join(path_prefix, "leap", "events") + self._connections = [] + + @property + def component_type(self): + if not self._component_type: + raise Exception( + "Make sure implementations of TxZmqComponent" + "define a self._component_type!") + return self._component_type + + def _zmq_connect(self, connClass, address): + """ + Connect to an address. + + :param connClass: The connection class to be used. + :type connClass: txzmq.ZmqConnection + :param address: The address to connect to. + :type address: str + + :return: The binded connection. + :rtype: txzmq.ZmqConnection + """ + connection = connClass(self._factory) + # create and configure socket + socket = connection.socket + if zmq_has_curve(): + public, secret = maybe_create_and_get_certificates( + self._config_prefix, self.component_type) + server_public_file = os.path.join( + self._config_prefix, PUBLIC_KEYS_PREFIX, "server.key") + server_public, _ = zmq.auth.load_certificate(server_public_file) + socket.curve_publickey = public + socket.curve_secretkey = secret + socket.curve_serverkey = server_public + socket.connect(address) + logger.debug("Connected %s to %s." % (connClass, address)) + self._connections.append(connection) + return connection + + def _zmq_bind(self, connClass, address): + """ + Bind to an address. + + :param connClass: The connection class to be used. + :type connClass: txzmq.ZmqConnection + :param address: The address to bind to. + :type address: str + + :return: The binded connection and port. + :rtype: (txzmq.ZmqConnection, int) + """ + connection = connClass(self._factory) + socket = connection.socket + if zmq_has_curve(): + public, secret = maybe_create_and_get_certificates( + self._config_prefix, self.component_type) + socket.curve_publickey = public + socket.curve_secretkey = secret + self._start_thread_auth(connection.socket) + + proto, addr, port = ADDRESS_RE.search(address).groups() + + if proto == "tcp": + if port is None or port is '0': + params = proto, addr + port = socket.bind_to_random_port("%s://%s" % params) + logger.debug("Binded %s to %s://%s." % ((connClass,) + params)) + else: + params = proto, addr, int(port) + socket.bind("%s://%s:%d" % params) + logger.debug( + "Binded %s to %s://%s:%d." % ((connClass,) + params)) + else: + params = proto, addr + socket.bind("%s://%s" % params) + logger.debug( + "Binded %s to %s://%s" % ((connClass,) + params)) + self._connections.append(connection) + return connection, port + + def _start_thread_auth(self, socket): + """ + Start the zmq curve thread authenticator. + + :param socket: The socket in which to configure the authenticator. + :type socket: zmq.Socket + """ + authenticator = ThreadAuthenticator(self._factory.context) + authenticator.start() + # XXX do not hardcode this here. + authenticator.allow('127.0.0.1') + # tell authenticator to use the certificate in a directory + public_keys_dir = os.path.join(self._config_prefix, PUBLIC_KEYS_PREFIX) + authenticator.configure_curve(domain="*", location=public_keys_dir) + socket.curve_server = True # must come before bind + + def shutdown(self): + """ + Shutdown the component. + """ + logger.debug("Shutting down component %s." % str(self)) + for conn in self._connections: + conn.shutdown() + self._factory.shutdown() + + +class TxZmqServerComponent(TxZmqComponent): + """ + A txZMQ server component. + """ + + _component_type = "server" + + +class TxZmqClientComponent(TxZmqComponent): + """ + A txZMQ client component. + """ + + _component_type = "client" diff --git a/src/leap/common/http.py b/src/leap/common/http.py new file mode 100644 index 0000000..0dee3a2 --- /dev/null +++ b/src/leap/common/http.py @@ -0,0 +1,348 @@ +# -*- coding: utf-8 -*- +# http.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Twisted HTTP/HTTPS client. +""" + +try: + import twisted + assert twisted +except ImportError: + print "*******" + print "Twisted is needed to use leap.common.http module" + print "" + print "Install the extra requirement of the package:" + print "$ pip install leap.common[Twisted]" + import sys + sys.exit(1) + + +from leap.common.certs import get_compatible_ssl_context_factory +from leap.common.check import leap_assert + +from zope.interface import implements + +from twisted.internet import reactor +from twisted.internet import defer +from twisted.python import failure + +from twisted.web.client import Agent +from twisted.web.client import HTTPConnectionPool +from twisted.web.client import _HTTP11ClientFactory as HTTP11ClientFactory +from twisted.web.client import readBody +from twisted.web.http_headers import Headers +from twisted.web.iweb import IBodyProducer +from twisted.web._newclient import HTTP11ClientProtocol + + +__all__ = ["HTTPClient"] + + +# A default HTTP timeout is used for 2 distinct purposes: +# 1. as HTTP connection timeout, prior to connection estabilshment. +# 2. as data reception timeout, after the connection has been established. +DEFAULT_HTTP_TIMEOUT = 30 # seconds + + +class _HTTP11ClientFactory(HTTP11ClientFactory): + """ + A timeout-able HTTP 1.1 client protocol factory. + """ + + def __init__(self, quiescentCallback, timeout): + """ + :param quiescentCallback: The quiescent callback to be passed to + protocol instances, used to return them to + the connection pool. + :type quiescentCallback: callable(Protocol) + :param timeout: The timeout, in seconds, for requests made by + protocols created by this factory. + :type timeout: float + """ + HTTP11ClientFactory.__init__(self, quiescentCallback) + self._timeout = timeout + + def buildProtocol(self, _): + """ + Build the HTTP 1.1 client protocol. + """ + return _HTTP11ClientProtocol(self._quiescentCallback, self._timeout) + + +class _HTTPConnectionPool(HTTPConnectionPool): + """ + A timeout-able HTTP connection pool. + """ + + _factory = _HTTP11ClientFactory + + def __init__(self, reactor, persistent, timeout, maxPersistentPerHost=10): + HTTPConnectionPool.__init__(self, reactor, persistent=persistent) + self.maxPersistentPerHost = maxPersistentPerHost + self._timeout = timeout + + def _newConnection(self, key, endpoint): + def quiescentCallback(protocol): + self._putConnection(key, protocol) + factory = self._factory(quiescentCallback, timeout=self._timeout) + return endpoint.connect(factory) + + +class HTTPClient(object): + """ + HTTP client done the twisted way, with a main focus on pinning the SSL + certificate. + + By default, it uses a shared connection pool. If you want a dedicated + one, create and pass on __init__ pool parameter. + Please note that this client will limit the maximum amount of connections + by using a DeferredSemaphore. + This limit is equal to the maxPersistentPerHost used on pool and is needed + in order to avoid resource abuse on huge requests batches. + """ + + _pool = _HTTPConnectionPool( + reactor, + persistent=True, + timeout=DEFAULT_HTTP_TIMEOUT, + maxPersistentPerHost=10 + ) + + def __init__(self, cert_file=None, + timeout=DEFAULT_HTTP_TIMEOUT, pool=None): + """ + Init the HTTP client + + :param cert_file: The path to the certificate file, if None given the + system's CAs will be used. + :type cert_file: str + :param timeout: The amount of time that this Agent will wait for the + peer to accept a connection and for each request to be + finished. If a pool is passed, then this argument is + ignored. + :type timeout: float + """ + + self._timeout = timeout + self._pool = pool if pool is not None else self._pool + self._agent = Agent( + reactor, + get_compatible_ssl_context_factory(cert_file), + pool=self._pool, + connectTimeout=self._timeout) + self._semaphore = defer.DeferredSemaphore( + self._pool.maxPersistentPerHost) + + def _createPool(self, maxPersistentPerHost=10, persistent=True): + pool = _HTTPConnectionPool(reactor, persistent, self._timeout) + pool.maxPersistentPerHost = maxPersistentPerHost + return pool + + def _request(self, url, method, body, headers, callback): + """ + Perform an HTTP request. + + :param url: The URL for the request. + :type url: str + :param method: The HTTP method of the request. + :type method: str + :param body: The body of the request, if any. + :type body: str + :param headers: The headers of the request. + :type headers: dict + :param callback: A callback to be added to the request's deferred + callback chain. + :type callback: callable + + :return: A deferred that fires with the body of the request. + :rtype: twisted.internet.defer.Deferred + """ + if body: + body = _StringBodyProducer(body) + d = self._agent.request( + method, url, headers=Headers(headers), bodyProducer=body) + d.addCallback(callback) + return d + + def request(self, url, method='GET', body=None, headers={}, + callback=readBody): + """ + Perform an HTTP request, but limit the maximum amount of concurrent + connections. + + May be passed a callback to be added to the request's deferred + callback chain. The callback is expected to receive the response of + the request and may do whatever it wants with the response. By + default, if no callback is passed, we will use a simple body reader + which returns a deferred that is fired with the body of the response. + + :param url: The URL for the request. + :type url: str + :param method: The HTTP method of the request. + :type method: str + :param body: The body of the request, if any. + :type body: str + :param headers: The headers of the request. + :type headers: dict + :param callback: A callback to be added to the request's deferred + callback chain. + :type callback: callable + + :return: A deferred that fires with the body of the request. + :rtype: twisted.internet.defer.Deferred + """ + leap_assert( + callable(callback), + message="The callback parameter should be a callable!") + return self._semaphore.run(self._request, url, method, body, headers, + callback) + + def close(self): + """ + Close any cached connections. + """ + self._pool.closeCachedConnections() + +# +# An IBodyProducer to write the body of an HTTP request as a string. +# + + +class _StringBodyProducer(object): + """ + A producer that writes the body of a request to a consumer. + """ + + implements(IBodyProducer) + + def __init__(self, body): + """ + Initialize the string produer. + + :param body: The body of the request. + :type body: str + """ + self.body = body + self.length = len(body) + + def startProducing(self, consumer): + """ + Write the body to the consumer. + + :param consumer: Any IConsumer provider. + :type consumer: twisted.internet.interfaces.IConsumer + + :return: A successful deferred. + :rtype: twisted.internet.defer.Deferred + """ + consumer.write(self.body) + return defer.succeed(None) + + def pauseProducing(self): + pass + + def stopProducing(self): + pass + + +# +# Patched twisted.web classes +# + +class _HTTP11ClientProtocol(HTTP11ClientProtocol): + """ + A timeout-able HTTP 1.1 client protocol, that is instantiated by the + _HTTP11ClientFactory below. + """ + + def __init__(self, quiescentCallback, timeout): + """ + Initialize the protocol. + + :param quiescentCallback: + :type quiescentCallback: callable + :param timeout: A timeout, in seconds, for requests made by this + protocol. + :type timeout: float + """ + HTTP11ClientProtocol.__init__(self, quiescentCallback) + self._timeout = timeout + self._timeoutCall = None + + def request(self, request): + """ + Issue request over self.transport and return a Deferred which + will fire with a Response instance or an error. + + :param request: The object defining the parameters of the request to + issue. + :type request: twisted.web._newclient.Request + + :return: A deferred which fires after the request has finished. + :rtype: Deferred + """ + d = HTTP11ClientProtocol.request(self, request) + if self._timeout: + self._last_buffer_len = 0 + timeoutCall = reactor.callLater( + self._timeout, self._doTimeout, request) + self._timeoutCall = timeoutCall + return d + + def _doTimeout(self, request): + """ + Give up the request because of a timeout. + + :param request: The object defining the parameters of the request to + issue. + :type request: twisted.web._newclient.Request + """ + self._giveUp( + failure.Failure( + defer.TimeoutError( + "Getting %s took longer than %s seconds." + % (request.absoluteURI, self._timeout)))) + + def _cancelTimeout(self): + """ + Cancel the request timeout, when it's finished. + """ + if self._timeoutCall and self._timeoutCall.active(): + self._timeoutCall.cancel() + self._timeoutCall = None + + def _finishResponse(self, rest): + """ + Cancel the timeout when finished receiving the response. + """ + self._cancelTimeout() + HTTP11ClientProtocol._finishResponse(self, rest) + + def dataReceived(self, bytes): + """ + Receive some data and extend the timeout period of this request. + + :param bytes: A string of indeterminate length. + :type bytes: str + """ + HTTP11ClientProtocol.dataReceived(self, bytes) + if self._timeoutCall and self._timeoutCall.active(): + self._timeoutCall.reset(self._timeout) + + def connectionLost(self, reason): + self._cancelTimeout() + return HTTP11ClientProtocol.connectionLost(self, reason) diff --git a/src/leap/common/plugins.py b/src/leap/common/plugins.py new file mode 100644 index 0000000..04152f9 --- /dev/null +++ b/src/leap/common/plugins.py @@ -0,0 +1,76 @@ +# -*- coding: utf-8 -*- +# plugins.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Twisted plugins leap utilities. +""" +import os.path + +from twisted.plugin import getPlugins + +from leap.common.config import get_path_prefix + +# A whitelist of modules from where to collect plugins dynamically. +# For the moment restricted to leap namespace, but the idea is that we can pass +# other "trusted" modules as options to the initialization of soledad. + +# TODO discover all the namespace automagically + +PLUGGABLE_LEAP_MODULES = ('mail', 'keymanager') + +_preffix = get_path_prefix() +rc_file = os.path.join(_preffix, "leap", "leap.cfg") + + +def _get_extra_pluggable_modules(): + import ConfigParser + config = ConfigParser.RawConfigParser() + config.read(rc_file) + try: + modules = eval( + config.get('plugins', 'extra_pluggable_modules'), {}, {}) + except (ConfigParser.NoSectionError, ConfigParser.NoOptionError, + ConfigParser.MissingSectionHeaderError): + modules = [] + return modules + +if os.path.isfile(rc_file): + # TODO in the case of being called from the standalone client, + # we should pass the flag in some other way. + EXTRA_PLUGGABLE_MODULES = _get_extra_pluggable_modules() +else: + EXTRA_PLUGGABLE_MODULES = [] + + +def collect_plugins(interface): + """ + Traverse a whitelist of modules and collect all the plugins that implement + the passed interface. + """ + plugins = [] + for namespace in PLUGGABLE_LEAP_MODULES: + try: + module = __import__('leap.%s.plugins' % namespace, fromlist='.') + plugins = plugins + list(getPlugins(interface, module)) + except ImportError: + pass + for namespace in EXTRA_PLUGGABLE_MODULES: + try: + module = __import__('%s.plugins' % namespace, fromlist='.') + plugins = plugins + list(getPlugins(interface, module)) + except ImportError: + pass + return plugins diff --git a/src/leap/common/testing/basetest.py b/src/leap/common/testing/basetest.py index 54826d5..3d3cee0 100644 --- a/src/leap/common/testing/basetest.py +++ b/src/leap/common/testing/basetest.py @@ -28,8 +28,13 @@ except ImportError: import unittest from leap.common.check import leap_assert +from leap.common.events import server as events_server +from leap.common.events import client as events_client +from leap.common.events import flags, set_events_enabled from leap.common.files import mkdir_p, check_and_fix_urw_only +set_events_enabled(False) + class BaseLeapTest(unittest.TestCase): """ @@ -40,6 +45,14 @@ class BaseLeapTest(unittest.TestCase): @classmethod def setUpClass(cls): + cls.setUpEnv() + + @classmethod + def tearDownClass(cls): + cls.tearDownEnv() + + @classmethod + def setUpEnv(cls): """ Sets up common facilities for testing this TestCase: - custom PATH and HOME environmental variables @@ -48,6 +61,9 @@ class BaseLeapTest(unittest.TestCase): """ cls.old_path = os.environ['PATH'] cls.old_home = os.environ['HOME'] + cls.old_xdg_config = None + if "XDG_CONFIG_HOME" in os.environ: + cls.old_xdg_config = os.environ["XDG_CONFIG_HOME"] cls.tempdir = tempfile.mkdtemp(prefix="leap_tests-") cls.home = cls.tempdir bin_tdir = os.path.join( @@ -55,20 +71,41 @@ class BaseLeapTest(unittest.TestCase): 'bin') os.environ["PATH"] = bin_tdir os.environ["HOME"] = cls.tempdir + os.environ["XDG_CONFIG_HOME"] = os.path.join(cls.tempdir, ".config") + cls._init_events() @classmethod - def tearDownClass(cls): + def _init_events(cls): + if flags.EVENTS_ENABLED: + cls._server = events_server.ensure_server( + emit_addr="tcp://127.0.0.1:0", + reg_addr="tcp://127.0.0.1:0") + events_client.configure_client( + emit_addr="tcp://127.0.0.1:%d" % cls._server.pull_port, + reg_addr="tcp://127.0.0.1:%d" % cls._server.pub_port) + + @classmethod + def tearDownEnv(cls): """ Cleanup common facilities used for testing this TestCase: - restores the default PATH and HOME variables - removes the temporal folder """ + if flags.EVENTS_ENABLED: + events_client.shutdown() + cls._server.shutdown() + os.environ["PATH"] = cls.old_path os.environ["HOME"] = cls.old_home + if cls.old_xdg_config is not None: + os.environ["XDG_CONFIG_HOME"] = cls.old_xdg_config # safety check! please do not wipe my home... # XXX needs to adapt to non-linuces leap_assert( cls.tempdir.startswith('/tmp/leap_tests-') or + (cls.tempdir.startswith('/tmp/') and + cls.tempdir.startswith(tempfile.gettempdir()) and + 'leap_tests-' in cls.tempdir) or cls.tempdir.startswith('/var/folder'), "beware! tried to remove a dir which does not " "live in temporal folder!") diff --git a/src/leap/common/testing/leaptest_combined_keycert.pem b/src/leap/common/testing/leaptest_combined_keycert.pem new file mode 100644 index 0000000..f1e0fb2 --- /dev/null +++ b/src/leap/common/testing/leaptest_combined_keycert.pem @@ -0,0 +1,127 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDAT67c8y4ORa6o +/gor3gAouJI6W6R7YlPzzh4uaba4YhqgAEZzcZ7S589CqON8Eo8G2tQvh+ZSaNmP +idFDjFxfpb1sBMb9Nu720s0V8UMxzCDh7oyePZGdKP+F04+MkB0eHWSObBrzsjsj +KVsD/5fUeLbd1SdK1C2cKzsoRQ+GVEyMJeiISFlV05fxdAZSybPcwH+3aiVX+7cM +9ek8bufIeqrAfZVdvO/2Fuv6WfR2MEe32XO/9gj02XSGWd6sPXMn/lflKvNNKEWC +UWdqTArGi0CqQ4D+Q73ruUIthCJr+7LWK+7QKG980a1RR7GZJvs3skhUDh3eua2u +ICPiqQLlAgMBAAECggEBAKJr6j0UagaF1dFG1eJc2neKA36kXdQTpOIaaKU8haVO +vjv6X4YrJT/tpsAfEhp9Ni1M7r7CIcXiZjVz6bkKOA5UVhqAImxEVClEuw/YN688 +P11yc3NGftBkiwNFPk0yflUr7/zV0yGVm5rD1+oVme9KkO/kkg4CDA+E9664PTdu +S4NVvL6OgHUG2nucqIz0kzUBapo7okIkvggfldeDYF47rn/e0VikNpKkMzoCzWs+ +3ryzldfigwg18rE2nltQYk/Mi+H30iNTNEonLjs9YbiDPn8iy5SZ7xY4lAO1Urev +skdY04hmkOzh9JEETHeTj1LUIw7tfRdS3X6B+19gkCECgYEA+41UOLBb0ZHsGjsA +aZBtI4g8mvGTOFe/Az7Jm0404z6e8AIe4NeSJoJ6oV6WG2BmKvqTViYT2QnmauHA +WB1xJ5fH3PK7/LjICI2QE247E294+g5p1KYysALc2fChfJNpdfETzWJAUSSBFqxb +amCDZ4Gc8S/V44fEkJKPLuAvxL0CgYEAw7YyGsEX+lDGzumFQY4wz20+Cyvilnx0 +xoFGQ85RSprZttU64PID/u1HD9/Nv4lGcBDcTTtrmOEk9x51YzttrsPF9sn+Wj0y +eahuR32Qc1652Y7fAKgN2vIZRFBcGpAsemcoqmYFRMImX60G0areKaclooimTbJA +Si52P4IKnUkCgYAI+07ah1F/9hncBedJ3aJH9oFTdvSuulNTplZEeVJiGsZKA4le +tdO+FEKUqG/rolGDj1bbaJik0zmq70yS2NpFc6HrPa+Aoohh5cwTJYhudTh4lTMq +KJT+u9tu3KynagwF7gmq96scOpVxXc4VykRm2bXk1rRoX1yhXNpH7jFGcQKBgCn/ +v2DebzbYftGIa4BV80OQPfBHyqhgrO6sb1e9vtQzxuTlfW0ogpMCeG1/qbegzeze +sWghiEWWi0g80RQqfK80dBcx4dObrmlNK91LpOQdP+TgNBr/9Xk22xU96YYJyoG6 +AZAPtLG8uF9v0jbMZECsDfeDO60Qw5snvViDn6OBAoGACAbbQCbuh0cduDVwn0uz +8XDyRTKDhOsJ3p6UB1TjwvbLeoPI93Dv1gpEeoqELtCu+ZXr3+/i/IbwrxucrFNn +L/s+4zR2mlEAxBdtCFsH/Qf7+iYpBFSo4YReXz3xR+EVTmswzodJCbtF74oQSr6o +7d56Rd/5k2UkFeXnlGOLyAc= +-----END PRIVATE KEY----- +Certificate: + Data: + Version: 3 (0x2) + Serial Number: 4 (0x4) + Signature Algorithm: sha1WithRSAEncryption + Issuer: C=US, ST=CA, L=Cyberspace, O=LEAP Encryption Access Project, OU=cyberspace, CN=tests-leap.se/name=tests-leap/emailAddress=tests@leap.se + Validity + Not Before: Sep 3 17:52:16 2013 GMT + Not After : Sep 1 17:52:16 2023 GMT + Subject: C=US, ST=CA, L=Cyberspace, O=LEAP Encryption Access Project, OU=cyberspace, CN=localhost/name=tests-leap/emailAddress=tests@leap.se + Subject Public Key Info: + Public Key Algorithm: rsaEncryption + Public-Key: (2048 bit) + Modulus: + 00:c0:4f:ae:dc:f3:2e:0e:45:ae:a8:fe:0a:2b:de: + 00:28:b8:92:3a:5b:a4:7b:62:53:f3:ce:1e:2e:69: + b6:b8:62:1a:a0:00:46:73:71:9e:d2:e7:cf:42:a8: + e3:7c:12:8f:06:da:d4:2f:87:e6:52:68:d9:8f:89: + d1:43:8c:5c:5f:a5:bd:6c:04:c6:fd:36:ee:f6:d2: + cd:15:f1:43:31:cc:20:e1:ee:8c:9e:3d:91:9d:28: + ff:85:d3:8f:8c:90:1d:1e:1d:64:8e:6c:1a:f3:b2: + 3b:23:29:5b:03:ff:97:d4:78:b6:dd:d5:27:4a:d4: + 2d:9c:2b:3b:28:45:0f:86:54:4c:8c:25:e8:88:48: + 59:55:d3:97:f1:74:06:52:c9:b3:dc:c0:7f:b7:6a: + 25:57:fb:b7:0c:f5:e9:3c:6e:e7:c8:7a:aa:c0:7d: + 95:5d:bc:ef:f6:16:eb:fa:59:f4:76:30:47:b7:d9: + 73:bf:f6:08:f4:d9:74:86:59:de:ac:3d:73:27:fe: + 57:e5:2a:f3:4d:28:45:82:51:67:6a:4c:0a:c6:8b: + 40:aa:43:80:fe:43:bd:eb:b9:42:2d:84:22:6b:fb: + b2:d6:2b:ee:d0:28:6f:7c:d1:ad:51:47:b1:99:26: + fb:37:b2:48:54:0e:1d:de:b9:ad:ae:20:23:e2:a9: + 02:e5 + Exponent: 65537 (0x10001) + X509v3 extensions: + X509v3 Basic Constraints: + CA:FALSE + Netscape Cert Type: + SSL Server + Netscape Comment: + Easy-RSA Generated Server Certificate + X509v3 Subject Key Identifier: + 51:92:B6:A3:D7:D6:EC:8F:FC:16:C5:D4:0F:87:CC:EA:5C:7C:17:81 + X509v3 Authority Key Identifier: + keyid:36:19:70:96:A9:5C:FE:A3:82:0F:79:95:31:52:2B:4A:41:BD:81:CB + DirName:/C=US/ST=CA/L=Cyberspace/O=LEAP Encryption Access Project/OU=cyberspace/CN=tests-leap.se/name=tests-leap/emailAddress=tests@leap.se + serial:8B:BF:41:63:1E:10:6A:EC + + X509v3 Extended Key Usage: + TLS Web Server Authentication + X509v3 Key Usage: + Digital Signature, Key Encipherment + Signature Algorithm: sha1WithRSAEncryption + 88:d9:35:e0:d9:fa:fd:6b:57:e2:4d:f6:ef:91:6f:56:a6:2b: + 1a:1e:ec:8f:b0:18:e3:ec:ca:c9:1e:78:07:1d:0f:cf:fe:09: + 21:84:25:c4:27:ea:22:d7:48:53:73:ed:78:0f:42:5c:c7:f7: + 38:04:84:df:67:99:fd:75:6f:e8:dc:3b:91:ab:fa:c7:32:e5: + fd:3b:ce:de:7c:6a:df:39:46:1e:46:3a:4d:e1:e1:60:f3:bf: + aa:b2:0b:5d:ee:f2:0c:ee:82:7f:b5:02:75:04:47:d5:2b:c8: + e0:6d:6a:10:4a:ca:e0:c3:4f:ee:ff:15:71:37:f5:4d:95:38: + fe:a3:da:84:46:90:04:c2:61:86:a0:7f:e2:7d:62:46:6d:f6: + f8:90:51:88:c3:f2:8c:ca:b3:89:40:9f:6b:8b:33:65:e1:fd: + 0f:8b:d7:6a:93:dc:de:be:85:07:c7:d1:1d:b5:db:70:54:9f: + 95:d8:fb:11:f7:a7:e6:90:ba:9b:28:0e:3d:47:7a:63:6d:60: + 44:f6:96:aa:b6:a2:bc:0a:e5:25:c8:a2:74:91:54:95:bb:e2: + 09:01:56:73:6e:56:e8:6f:d6:a5:d8:18:96:c1:82:ef:2c:9e: + e2:4c:94:bc:00:71:5e:16:49:6b:e4:94:7a:d1:0c:2e:f4:19: + b2:2a:c2:b8 +-----BEGIN CERTIFICATE----- +MIIFdDCCBFygAwIBAgIBBDANBgkqhkiG9w0BAQUFADCBuDELMAkGA1UEBhMCVVMx +CzAJBgNVBAgTAkNBMRMwEQYDVQQHEwpDeWJlcnNwYWNlMScwJQYDVQQKEx5MRUFQ +IEVuY3J5cHRpb24gQWNjZXNzIFByb2plY3QxEzARBgNVBAsTCmN5YmVyc3BhY2Ux +FjAUBgNVBAMTDXRlc3RzLWxlYXAuc2UxEzARBgNVBCkTCnRlc3RzLWxlYXAxHDAa +BgkqhkiG9w0BCQEWDXRlc3RzQGxlYXAuc2UwHhcNMTMwOTAzMTc1MjE2WhcNMjMw +OTAxMTc1MjE2WjCBtDELMAkGA1UEBhMCVVMxCzAJBgNVBAgTAkNBMRMwEQYDVQQH +EwpDeWJlcnNwYWNlMScwJQYDVQQKEx5MRUFQIEVuY3J5cHRpb24gQWNjZXNzIFBy +b2plY3QxEzARBgNVBAsTCmN5YmVyc3BhY2UxEjAQBgNVBAMTCWxvY2FsaG9zdDET +MBEGA1UEKRMKdGVzdHMtbGVhcDEcMBoGCSqGSIb3DQEJARYNdGVzdHNAbGVhcC5z +ZTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMBPrtzzLg5Frqj+Cive +ACi4kjpbpHtiU/POHi5ptrhiGqAARnNxntLnz0Ko43wSjwba1C+H5lJo2Y+J0UOM +XF+lvWwExv027vbSzRXxQzHMIOHujJ49kZ0o/4XTj4yQHR4dZI5sGvOyOyMpWwP/ +l9R4tt3VJ0rULZwrOyhFD4ZUTIwl6IhIWVXTl/F0BlLJs9zAf7dqJVf7twz16Txu +58h6qsB9lV287/YW6/pZ9HYwR7fZc7/2CPTZdIZZ3qw9cyf+V+Uq800oRYJRZ2pM +CsaLQKpDgP5Dveu5Qi2EImv7stYr7tAob3zRrVFHsZkm+zeySFQOHd65ra4gI+Kp +AuUCAwEAAaOCAYkwggGFMAkGA1UdEwQCMAAwEQYJYIZIAYb4QgEBBAQDAgZAMDQG +CWCGSAGG+EIBDQQnFiVFYXN5LVJTQSBHZW5lcmF0ZWQgU2VydmVyIENlcnRpZmlj +YXRlMB0GA1UdDgQWBBRRkraj19bsj/wWxdQPh8zqXHwXgTCB7QYDVR0jBIHlMIHi +gBQ2GXCWqVz+o4IPeZUxUitKQb2By6GBvqSBuzCBuDELMAkGA1UEBhMCVVMxCzAJ +BgNVBAgTAkNBMRMwEQYDVQQHEwpDeWJlcnNwYWNlMScwJQYDVQQKEx5MRUFQIEVu +Y3J5cHRpb24gQWNjZXNzIFByb2plY3QxEzARBgNVBAsTCmN5YmVyc3BhY2UxFjAU +BgNVBAMTDXRlc3RzLWxlYXAuc2UxEzARBgNVBCkTCnRlc3RzLWxlYXAxHDAaBgkq +hkiG9w0BCQEWDXRlc3RzQGxlYXAuc2WCCQCLv0FjHhBq7DATBgNVHSUEDDAKBggr +BgEFBQcDATALBgNVHQ8EBAMCBaAwDQYJKoZIhvcNAQEFBQADggEBAIjZNeDZ+v1r +V+JN9u+Rb1amKxoe7I+wGOPsyskeeAcdD8/+CSGEJcQn6iLXSFNz7XgPQlzH9zgE +hN9nmf11b+jcO5Gr+scy5f07zt58at85Rh5GOk3h4WDzv6qyC13u8gzugn+1AnUE +R9UryOBtahBKyuDDT+7/FXE39U2VOP6j2oRGkATCYYagf+J9YkZt9viQUYjD8ozK +s4lAn2uLM2Xh/Q+L12qT3N6+hQfH0R2123BUn5XY+xH3p+aQupsoDj1HemNtYET2 +lqq2orwK5SXIonSRVJW74gkBVnNuVuhv1qXYGJbBgu8snuJMlLwAcV4WSWvklHrR +DC70GbIqwrg= +-----END CERTIFICATE----- diff --git a/src/leap/common/testing/test_basetest.py b/src/leap/common/testing/test_basetest.py index cf0962d..ec42a62 100644 --- a/src/leap/common/testing/test_basetest.py +++ b/src/leap/common/testing/test_basetest.py @@ -83,12 +83,10 @@ class TestInitBaseLeapTest(BaseLeapTest): """ def setUp(self): - """nuke it""" - pass + self.setUpEnv() def tearDown(self): - """nuke it""" - pass + self.tearDownEnv() def test_path_is_changed(self): """tests whether we have changed the PATH env var""" diff --git a/src/leap/common/tests/test_certs.py b/src/leap/common/tests/test_certs.py new file mode 100644 index 0000000..8ebc0f4 --- /dev/null +++ b/src/leap/common/tests/test_certs.py @@ -0,0 +1,99 @@ +# -*- coding: utf-8 -*- +# test_certs.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for: + * leap/common/certs.py +""" +import os +import time + +try: + import unittest2 as unittest +except ImportError: + import unittest + +from leap.common import certs +from leap.common.testing.basetest import BaseLeapTest + +TEST_CERT_PEM = os.path.join( + os.path.split(__file__)[0], + '..', 'testing', "leaptest_combined_keycert.pem") + +# Values from the test cert file: +# Not Before: Sep 3 17:52:16 2013 GMT +# Not After : Sep 1 17:52:16 2023 GMT +CERT_NOT_BEFORE = (2013, 9, 3, 17, 52, 16, 1, 246, 0) +CERT_NOT_AFTER = (2023, 9, 1, 17, 52, 16, 4, 244, 0) + + +class CertsTest(BaseLeapTest): + + def setUp(self): + self.setUpEnv() + + def tearDown(self): + self.tearDownEnv() + + def test_should_redownload_if_no_cert(self): + self.assertTrue(certs.should_redownload(certfile="")) + + def test_should_redownload_if_invalid_pem(self): + cert_path = self.get_tempfile('test_pem_file.pem') + + with open(cert_path, 'w') as f: + f.write('this is some invalid data for the pem file') + + self.assertTrue(certs.should_redownload(cert_path)) + + def test_should_redownload_if_before(self): + def new_now(): + time.struct_time(CERT_NOT_BEFORE) + self.assertTrue(certs.should_redownload(TEST_CERT_PEM, now=new_now)) + + def test_should_redownload_if_after(self): + def new_now(): + time.struct_time(CERT_NOT_AFTER) + self.assertTrue(certs.should_redownload(TEST_CERT_PEM, now=new_now)) + + def test_not_should_redownload(self): + self.assertFalse(certs.should_redownload(TEST_CERT_PEM)) + + def test_is_valid_pemfile(self): + with open(TEST_CERT_PEM) as f: + cert_data = f.read() + + self.assertTrue(certs.is_valid_pemfile(cert_data)) + + def test_not_is_valid_pemfile(self): + cert_data = 'this is some invalid data for the pem file' + + self.assertFalse(certs.is_valid_pemfile(cert_data)) + + def test_get_cert_time_boundaries(self): + """ + This test ensures us that the returned values are returned in UTC/GMT. + """ + with open(TEST_CERT_PEM) as f: + cert_data = f.read() + + valid_from, valid_to = certs.get_cert_time_boundaries(cert_data) + self.assertEqual(tuple(valid_from), CERT_NOT_BEFORE) + self.assertEqual(tuple(valid_to), CERT_NOT_AFTER) + + +if __name__ == "__main__": + unittest.main() diff --git a/src/leap/common/tests/test_events.py b/src/leap/common/tests/test_events.py index 0779b2e..2ad097e 100644 --- a/src/leap/common/tests/test_events.py +++ b/src/leap/common/tests/test_events.py @@ -1,4 +1,4 @@ -## -*- coding: utf-8 -*- +# -*- coding: utf-8 -*- # test_events.py # Copyright (C) 2013 LEAP # @@ -15,414 +15,184 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import unittest -import sets -import time -import socket -import threading -import random - - -from mock import Mock -from protobuf.socketrpc import RpcService -from leap.common import events -from leap.common.events import ( - server, - client, - mac_auth, -) -from leap.common.events.events_pb2 import ( - EventsServerService, - EventsServerService_Stub, - EventsClientService_Stub, - EventResponse, - SignalRequest, - RegisterRequest, - PingRequest, - SOLEDAD_CREATING_KEYS, - CLIENT_UID, -) +import os +import logging +import time -port = 8090 +from twisted.internet.reactor import callFromThread +from twisted.trial import unittest +from twisted.internet import defer -received = False +from leap.common.events import server +from leap.common.events import client +from leap.common.events import flags +from leap.common.events import txclient +from leap.common.events import catalog +from leap.common.events.errors import CallbackAlreadyRegisteredError -class EventsTestCase(unittest.TestCase): +if 'DEBUG' in os.environ: + logging.basicConfig(level=logging.DEBUG) - @classmethod - def setUpClass(cls): - server.EventsServerDaemon.ensure(8090) - cls.callbacks = events.client.registered_callbacks - @classmethod - def tearDownClass(cls): - # give some time for requests to be processed. - time.sleep(1) +class EventsGenericClientTestCase(object): def setUp(self): - super(EventsTestCase, self).setUp() + flags.set_events_enabled(True) + self._server = server.ensure_server( + emit_addr="tcp://127.0.0.1:0", + reg_addr="tcp://127.0.0.1:0") + self._client.configure_client( + emit_addr="tcp://127.0.0.1:%d" % self._server.pull_port, + reg_addr="tcp://127.0.0.1:%d" % self._server.pub_port) def tearDown(self): - #events.client.registered_callbacks = {} - server.registered_clients = {} - super(EventsTestCase, self).tearDown() - - def test_service_singleton(self): - """ - Ensure that there's always just one instance of the server daemon - running. - """ - service1 = server.EventsServerDaemon.ensure(8090) - service2 = server.EventsServerDaemon.ensure(8090) - self.assertEqual(service1, service2, - "Can't get singleton class for service.") + self._client.shutdown() + self._server.shutdown() + flags.set_events_enabled(False) + # wait a bit for sockets to close properly + time.sleep(0.1) def test_client_register(self): """ Ensure clients can register callbacks. """ - self.assertTrue(1 not in self.callbacks, - 'There should should be no callback for this signal.') - events.register(1, lambda x: True) - self.assertTrue(1 in self.callbacks, - 'Could not register signal in local client.') - events.register(2, lambda x: True) - self.assertTrue(1 in self.callbacks, - 'Could not register signal in local client.') - self.assertTrue(2 in self.callbacks, - 'Could not register signal in local client.') + callbacks = self._client.instance().callbacks + self.assertTrue(len(callbacks) == 0, + 'There should be no callback for this event.') + # register one event + event1 = catalog.CLIENT_UID - def test_register_signal_replace(self): - """ - Make sure clients can replace already registered callbacks. - """ - sig = 3 - cbk = lambda x: True - events.register(sig, cbk, uid=1) - self.assertRaises(Exception, events.register, sig, lambda x: True, - uid=1) - events.register(sig, lambda x: True, uid=1, replace=True) - self.assertTrue(sig in self.callbacks, 'Could not register signal.') - self.assertEqual(1, len(self.callbacks[sig]), - 'Wrong number of registered callbacks.') - - def test_signal_response_status(self): - """ - Ensure there's an appropriate response from server when signaling. - """ - sig = 4 - request = SignalRequest() - request.event = sig - request.content = 'my signal contents' - request.mac_method = mac_auth.MacMethod.MAC_NONE - request.mac = "" - service = RpcService(EventsServerService_Stub, port, 'localhost') - # test synch - response = service.signal(request, timeout=1000) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') - - def test_signal_executes_callback(self): - """ - Ensure callback is executed upon receiving signal. - """ - sig = CLIENT_UID - request = SignalRequest() - request.event = sig - request.content = 'my signal contents' - request.mac_method = mac_auth.MacMethod.MAC_NONE - request.mac = "" - service = RpcService(EventsServerService_Stub, port, 'localhost') - - # register a callback - flag = Mock() - events.register(sig, lambda req: flag(req.event)) - # signal - response = service.signal(request) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') - time.sleep(1) # wait for signal to arrive - flag.assert_called_once_with(sig) - - def test_events_server_service_register(self): - """ - Ensure the server can register clients to be signaled. - """ - sig = 5 - request = RegisterRequest() - request.event = sig - request.port = 8091 - request.mac_method = mac_auth.MacMethod.MAC_NONE - request.mac = "" - service = RpcService(EventsServerService_Stub, port, 'localhost') - complist = server.registered_clients - self.assertEqual({}, complist, - 'There should be no registered_ports when ' - 'server has just been created.') - response = service.register(request, timeout=1000) - self.assertTrue(sig in complist, "Signal not registered succesfully.") - self.assertTrue(8091 in complist[sig], - 'Failed registering client port.') - - def test_client_request_register(self): - """ - Ensure clients can register themselves with server. - """ - sig = 6 - complist = server.registered_clients - self.assertTrue(sig not in complist, - 'There should be no registered clients for this ' - 'signal.') - events.register(sig, lambda x: True) - time.sleep(0.1) - port = client.EventsClientDaemon.get_instance().get_port() - self.assertTrue(sig in complist, 'Failed registering client.') - self.assertTrue(port in complist[sig], - 'Failed registering client port.') + def cbk1(event, _): + return True - def test_client_receives_signal(self): - """ - Ensure clients can receive signals. - """ - sig = 7 - flag = Mock() - - events.register(sig, lambda req: flag(req.event)) - request = SignalRequest() - request.event = sig - request.content = "" - request.mac_method = mac_auth.MacMethod.MAC_NONE - request.mac = "" - service = RpcService(EventsServerService_Stub, port, 'localhost') - response = service.signal(request, timeout=1000) - self.assertTrue(response is not None, 'Did not receive response.') - time.sleep(0.5) - flag.assert_called_once_with(sig) - - def test_client_send_signal(self): - """ - Ensure clients can send signals. - """ - sig = 8 - response = events.signal(sig) - self.assertTrue(response.status == response.OK, - 'Received wrong response status when signaling.') + uid1 = self._client.register(event1, cbk1) + # assert for correct registration + self.assertTrue(len(callbacks) == 1) + self.assertTrue(callbacks[event1][uid1] == cbk1, + 'Could not register event in local client.') + # register another event + event2 = catalog.CLIENT_SESSION_ID - def test_client_unregister_all(self): - """ - Test that the client can unregister all events for one signal. - """ - sig = CLIENT_UID - complist = server.registered_clients - events.register(sig, lambda x: True) - events.register(sig, lambda x: True) - time.sleep(0.1) - events.unregister(sig) - time.sleep(0.1) - port = client.EventsClientDaemon.get_instance().get_port() - self.assertFalse(bool(complist[sig])) - self.assertTrue(port not in complist[sig]) + def cbk2(event, _): + return True - def test_client_unregister_by_uid(self): - """ - Test that the client can unregister an event by uid. - """ - sig = CLIENT_UID - complist = server.registered_clients - events.register(sig, lambda x: True, uid='cbkuid') - events.register(sig, lambda x: True, uid='cbkuid2') - time.sleep(0.1) - events.unregister(sig, uid='cbkuid') - time.sleep(0.1) - port = client.EventsClientDaemon.get_instance().get_port() - self.assertTrue(sig in complist) - self.assertTrue(len(complist[sig]) == 1) - self.assertTrue( - client.registered_callbacks[sig].pop()[0] == 'cbkuid2') - self.assertTrue(port in complist[sig]) - - def test_server_replies_ping(self): - """ - Ensure server replies to a ping. - """ - request = PingRequest() - service = RpcService(EventsServerService_Stub, port, 'localhost') - response = service.ping(request, timeout=1000) - self.assertIsNotNone(response) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') - - def test_client_replies_ping(self): - """ - Ensure clients reply to a ping. - """ - daemon = client.ensure_client_daemon() - port = daemon.get_port() - request = PingRequest() - service = RpcService(EventsClientService_Stub, port, 'localhost') - response = service.ping(request, timeout=1000) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') - - def test_server_ping(self): - """ - Ensure the function from server module pings correctly. - """ - response = server.ping() - self.assertIsNotNone(response) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') + uid2 = self._client.register(event2, cbk2) + # assert for correct registration + self.assertTrue(len(callbacks) == 2) + self.assertTrue(callbacks[event2][uid2] == cbk2, + 'Could not register event in local client.') - def test_client_ping(self): + def test_register_signal_replace(self): """ - Ensure the function from client module pings correctly. + Make sure clients can replace already registered callbacks. """ - daemon = client.ensure_client_daemon() - response = client.ping(daemon.get_port()) - self.assertIsNotNone(response) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') + event = catalog.CLIENT_UID + d = defer.Deferred() - def test_module_ping_server(self): - """ - Ensure the function from main module pings server correctly. - """ - response = events.ping_server() - self.assertIsNotNone(response) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') + def cbk_fail(event, _): + return callFromThread(d.errback, event) - def test_module_ping_client(self): - """ - Ensure the function from main module pings clients correctly. - """ - daemon = client.ensure_client_daemon() - response = events.ping_client(daemon.get_port()) - self.assertIsNotNone(response) - self.assertEqual(EventResponse.OK, response.status, - 'Wrong response status.') + def cbk_succeed(event, _): + return callFromThread(d.callback, event) + + self._client.register(event, cbk_fail, uid=1) + self._client.register(event, cbk_succeed, uid=1, replace=True) + self._client.emit(event, None) + return d - def test_ensure_server_raises_if_port_taken(self): + def test_register_signal_replace_fails_when_replace_is_false(self): """ - Verify that server raises an exception if port is already taken. + Make sure clients trying to replace already registered callbacks fail + when replace=False """ - # get a random free port - while True: - port = random.randint(1024, 65535) - try: - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.connect(('localhost', port)) - s.close() - except: - break - - class PortBlocker(threading.Thread): - - def run(self): - conns = 0 - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.bind(('localhost', port)) - s.setblocking(1) - s.listen(1) - while conns < 2: # blocks until rece - conns += 1 - s.accept() - s.close() - - # block the port - taker = PortBlocker() - taker.start() - time.sleep(1) # wait for thread to start. + event = catalog.CLIENT_UID + self._client.register(event, lambda event, _: None, uid=1) self.assertRaises( - server.PortAlreadyTaken, server.ensure_server, port) + CallbackAlreadyRegisteredError, + self._client.register, + event, lambda event, _: None, uid=1, replace=False) - def test_async_register(self): + def test_register_more_than_one_callback_works(self): """ - Test asynchronous registering of callbacks. + Make sure clients can replace already registered callbacks. """ - flag = Mock() + event = catalog.CLIENT_UID + d1 = defer.Deferred() + + def cbk1(event, _): + return callFromThread(d1.callback, event) - # executed after async register, when response is received from server - def reqcbk(request, response): - flag(request.event, response.status) + d2 = defer.Deferred() - # callback registered by application - def callback(request): - pass + def cbk2(event, _): + return d2.callback(event) - # passing a callback as reqcbk param makes the call asynchronous - result = events.register(CLIENT_UID, callback, reqcbk=reqcbk) - self.assertIsNone(result) - events.signal(CLIENT_UID) - time.sleep(1) # wait for signal to arrive from server - flag.assert_called_once_with(CLIENT_UID, EventResponse.OK) + self._client.register(event, cbk1) + self._client.register(event, cbk2) + self._client.emit(event, None) + d = defer.gatherResults([d1, d2]) + return d - def test_async_signal(self): + def test_client_receives_signal(self): """ - Test asynchronous registering of callbacks. + Ensure clients can receive signals. """ - flag = Mock() + event = catalog.CLIENT_UID + d = defer.Deferred() - # executed after async signal, when response is received from server - def reqcbk(request, response): - flag(request.event, response.status) + def cbk(events, _): + callFromThread(d.callback, event) - # passing a callback as reqcbk param makes the call asynchronous - result = events.signal(CLIENT_UID, reqcbk=reqcbk) - self.assertIsNone(result) - time.sleep(1) # wait for signal to arrive from server - flag.assert_called_once_with(CLIENT_UID, EventResponse.OK) + self._client.register(event, cbk) + self._client.emit(event, None) + return d - def test_async_unregister(self): + def test_client_unregister_all(self): """ - Test asynchronous unregistering of callbacks. + Test that the client can unregister all events for one signal. """ - flag = Mock() + event1 = catalog.CLIENT_UID + d = defer.Deferred() + # register more than one callback for the same event + self._client.register( + event1, lambda ev, _: callFromThread(d.errback, None)) + self._client.register( + event1, lambda ev, _: callFromThread(d.errback, None)) + # unregister and emit the event + self._client.unregister(event1) + self._client.emit(event1, None) + # register and emit another event so the deferred can succeed + event2 = catalog.CLIENT_SESSION_ID + self._client.register( + event2, lambda ev, _: callFromThread(d.callback, None)) + self._client.emit(event2, None) + return d - # executed after async signal, when response is received from server - def reqcbk(request, response): - flag(request.event, response.status) - - # callback registered by application - def callback(request): - pass - - # passing a callback as reqcbk param makes the call asynchronous - events.register(CLIENT_UID, callback) - result = events.unregister(CLIENT_UID, reqcbk=reqcbk) - self.assertIsNone(result) - time.sleep(1) # wait for signal to arrive from server - flag.assert_called_once_with(CLIENT_UID, EventResponse.OK) - - def test_async_ping_server(self): + def test_client_unregister_by_uid(self): """ - Test asynchronous pinging of server. + Test that the client can unregister an event by uid. """ - flag = Mock() + event = catalog.CLIENT_UID + d = defer.Deferred() + # register one callback that would fail + uid = self._client.register( + event, lambda ev, _: callFromThread(d.errback, None)) + # register one callback that will succeed + self._client.register( + event, lambda ev, _: callFromThread(d.callback, None)) + # unregister by uid and emit the event + self._client.unregister(event, uid=uid) + self._client.emit(event, None) + return d - # executed after async signal, when response is received from server - def reqcbk(request, response): - flag(response.status) - result = events.ping_server(reqcbk=reqcbk) - self.assertIsNone(result) - time.sleep(1) # wait for response to arrive from server. - flag.assert_called_once_with(EventResponse.OK) +class EventsTxClientTestCase(EventsGenericClientTestCase, unittest.TestCase): + + _client = txclient - def test_async_ping_client(self): - """ - Test asynchronous pinging of client. - """ - flag = Mock() - # executed after async signal, when response is received from server - def reqcbk(request, response): - flag(response.status) +class EventsClientTestCase(EventsGenericClientTestCase, unittest.TestCase): - daemon = client.ensure_client_daemon() - result = events.ping_client(daemon.get_port(), reqcbk=reqcbk) - self.assertIsNone(result) - time.sleep(1) # wait for response to arrive from server. - flag.assert_called_once_with(EventResponse.OK) + _client = client diff --git a/src/leap/common/tests/test_http.py b/src/leap/common/tests/test_http.py new file mode 100644 index 0000000..f44550f --- /dev/null +++ b/src/leap/common/tests/test_http.py @@ -0,0 +1,75 @@ +# -*- coding: utf-8 -*- +# test_http.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Tests for: + * leap/common/http.py +""" +import os +try: + import unittest2 as unittest +except ImportError: + import unittest + +from leap.common import http +from leap.common.testing.basetest import BaseLeapTest + +TEST_CERT_PEM = os.path.join( + os.path.split(__file__)[0], + '..', 'testing', "leaptest_combined_keycert.pem") + + +class HTTPClientTest(BaseLeapTest): + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_agents_sharing_pool_by_default(self): + client = http.HTTPClient() + client2 = http.HTTPClient(TEST_CERT_PEM) + self.assertNotEquals( + client._agent, client2._agent, "Expected dedicated agents") + self.assertEquals( + client._agent._pool, client2._agent._pool, + "Pool was not reused by default") + + def test_agent_can_have_dedicated_custom_pool(self): + custom_pool = http._HTTPConnectionPool( + None, + timeout=10, + maxPersistentPerHost=42, + persistent=False + ) + self.assertEquals(custom_pool.maxPersistentPerHost, 42, + "Custom persistent connections " + "limit is not being respected") + self.assertFalse(custom_pool.persistent, + "Custom persistence is not being respected") + default_client = http.HTTPClient() + custom_client = http.HTTPClient(pool=custom_pool) + + self.assertNotEquals( + default_client._agent, custom_client._agent, + "No agent reuse is expected") + self.assertEquals( + custom_pool, custom_client._agent._pool, + "Custom pool usage was not respected") + +if __name__ == "__main__": + unittest.main() diff --git a/src/leap/common/zmq_utils.py b/src/leap/common/zmq_utils.py new file mode 100644 index 0000000..0a781de --- /dev/null +++ b/src/leap/common/zmq_utils.py @@ -0,0 +1,103 @@ +# -*- coding: utf-8 -*- +# zmq.py +# Copyright (C) 2013, 2014 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Utilities to handle ZMQ certificates. +""" +import os +import logging +import stat +import shutil + +import zmq + +try: + import zmq.auth +except ImportError: + pass + +from leap.common.files import mkdir_p +from leap.common.check import leap_assert + +logger = logging.getLogger(__name__) + + +KEYS_PREFIX = "zmq_certificates" +PUBLIC_KEYS_PREFIX = os.path.join(KEYS_PREFIX, "public_keys") +PRIVATE_KEYS_PREFIX = os.path.join(KEYS_PREFIX, "private_keys") + + +def zmq_has_curve(): + """ + Return whether the current ZMQ has support for auth and CurveZMQ security. + + :rtype: bool + + Version notes: + `zmq.curve_keypair()` is new in version 14.0, new in version libzmq-4.0. + Requires libzmq (>= 4.0) to have been linked with libsodium. + `zmq.auth` module is new in version 14.1 + `zmq.has()` is new in version 14.1, new in version libzmq-4.1. + """ + zmq_version = zmq.zmq_version_info() + pyzmq_version = zmq.pyzmq_version_info() + + if pyzmq_version >= (14, 1, 0) and zmq_version >= (4, 1): + return zmq.has('curve') + + if pyzmq_version < (14, 1, 0): + return False + + if zmq_version < (4, 0): + # security is new in libzmq 4.0 + return False + + try: + zmq.curve_keypair() + except zmq.error.ZMQError: + # security requires libzmq to be linked against libsodium + return False + + return True + + +def assert_zmq_has_curve(): + leap_assert(zmq_has_curve, "CurveZMQ not supported!") + + +def maybe_create_and_get_certificates(basedir, name): + """ + Generate the needed ZMQ certificates for backend/frontend communication if + needed. + """ + assert_zmq_has_curve() + private_keys_dir = os.path.join(basedir, PRIVATE_KEYS_PREFIX) + private_key = os.path.join( + private_keys_dir, name + ".key_secret") + if not os.path.isfile(private_key): + mkdir_p(private_keys_dir) + zmq.auth.create_certificates(private_keys_dir, name) + # set permissions to: 0700 (U:rwx G:--- O:---) + os.chmod(private_key, stat.S_IRUSR | stat.S_IWUSR) + # move public key to public keys directory + public_keys_dir = os.path.join(basedir, PUBLIC_KEYS_PREFIX) + old_public_key = os.path.join( + private_keys_dir, name + ".key") + new_public_key = os.path.join( + public_keys_dir, name + ".key") + mkdir_p(public_keys_dir) + shutil.move(old_public_key, new_public_key) + return zmq.auth.load_certificate(private_key) |