summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2015-08-28 10:59:30 -0400
committerKali Kaneko <kali@leap.se>2015-08-28 10:59:30 -0400
commit8fa97c02b5f07f896e52d9bb272128f267af04ea (patch)
treee0ce6bbaaa48441ebcfb807a45e8753fe8432423
parentd272a953a01f5c601e4894a916f7b4d990a03327 (diff)
parentccecd1b3750bd10404511c33be1aaca82631a502 (diff)
Merge tag '0.4.2' into debian/experimental
Tag leap.common version 0.4.2 Conflicts: pkg/requirements-testing.pip setup.cfg src/leap/common/_version.py src/leap/common/events/events_pb2.py
-rw-r--r--.gitignore1
-rw-r--r--AUTHORS7
-rw-r--r--CHANGELOG26
-rw-r--r--README.rst1
-rwxr-xr-xpkg/generate_wheels.sh14
-rwxr-xr-xpkg/pip_install_requirements.sh86
-rw-r--r--pkg/requirements-testing.pip3
-rw-r--r--pkg/requirements.pip4
-rwxr-xr-xpkg/tools/get_authors.sh2
-rw-r--r--pkg/utils.py6
-rw-r--r--setup.cfg10
-rw-r--r--setup.py7
-rw-r--r--src/leap/common/__init__.py4
-rw-r--r--src/leap/common/_version.py10
-rw-r--r--src/leap/common/ca_bundle.py1
-rw-r--r--src/leap/common/certs.py35
-rw-r--r--src/leap/common/config/pluggableconfig.py27
-rw-r--r--src/leap/common/config/tests/test_baseconfig.py30
-rw-r--r--src/leap/common/decorators.py101
-rw-r--r--src/leap/common/events/README.rst117
-rw-r--r--src/leap/common/events/__init__.py318
-rw-r--r--src/leap/common/events/catalog.py88
-rw-r--r--src/leap/common/events/client.py743
-rw-r--r--src/leap/common/events/daemon.py208
-rw-r--r--src/leap/common/events/errors.py (renamed from src/leap/common/events/mac_auth.py)18
-rw-r--r--src/leap/common/events/events.proto145
-rw-r--r--src/leap/common/events/events_pb2.py635
-rw-r--r--src/leap/common/events/flags.py (renamed from src/leap/common/events/Makefile)25
-rw-r--r--src/leap/common/events/server.py242
-rw-r--r--src/leap/common/events/tests/__init__.py0
-rw-r--r--src/leap/common/events/tests/test_zmq_components.py51
-rw-r--r--src/leap/common/events/txclient.py194
-rw-r--r--src/leap/common/events/zmq_components.py188
-rw-r--r--src/leap/common/http.py348
-rw-r--r--src/leap/common/plugins.py76
-rw-r--r--src/leap/common/testing/basetest.py39
-rw-r--r--src/leap/common/testing/leaptest_combined_keycert.pem127
-rw-r--r--src/leap/common/testing/test_basetest.py6
-rw-r--r--src/leap/common/tests/test_certs.py99
-rw-r--r--src/leap/common/tests/test_events.py492
-rw-r--r--src/leap/common/tests/test_http.py75
-rw-r--r--src/leap/common/zmq_utils.py103
42 files changed, 2615 insertions, 2097 deletions
diff --git a/.gitignore b/.gitignore
index 25876cf..0e26c09 100644
--- a/.gitignore
+++ b/.gitignore
@@ -6,3 +6,4 @@
dist/
build/
MANIFEST
+_trial_temp
diff --git a/AUTHORS b/AUTHORS
new file mode 100644
index 0000000..e47319e
--- /dev/null
+++ b/AUTHORS
@@ -0,0 +1,7 @@
+Tomás Touceda <chiiph@leap.se>
+Kali Kaneko <kali@leap.se>
+drebs <drebs@leap.se>
+Ivan Alejandro <ivanalejandro0@gmail.com>
+Ruben Pollan <meskio@sindominio.net>
+Bruno Wagner <bwgpro@gmail.com>
+Victor Shyba <victor.shyba@gmail.com>
diff --git a/CHANGELOG b/CHANGELOG
index 8895de3..c34dc2f 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,3 +1,29 @@
+0.4.2 Aug 26, 2015:
+ o Add http request timeout. Related to #7234.
+ o Add a flag to disable events framework. Closes: #7259
+ o Allow passing callback to HTTP client.
+ o Bugfix: do not add a port string to non-tcp addresses.
+ o Add close method for http agent.
+ o Fix code style and tests.
+ o Bugfix: HTTP timeout was not being cleared on abort.
+
+0.4.1 Jul 10, 2015:
+ o Fix regexp to allow ipc protocol in zmq sockets. Closes: #7089.
+ o Remove extraneous data from events logs. Closes #7130.
+ o Make https client use Twisted SSL validation and adds a reuse by default
+ behavior on connection pool
+
+0.4.0 Jun 1, 2015:
+ o Modify leap.common.events to use ZMQ. Closes #6359.
+ o Fix time comparison between local and UTC times that caused the VPN
+ certificates not being correctly downloaded on time. Closes #6994.
+ o Add a HTTPClient the twisted way.
+
+0.3.10 Jan 26, 2015:
+ o Consider different possibilities for tmpdir. Related to #6631.
+ o Add support for deferreds to memoize_method decorator
+ o Extract the environment set up and tear down for tests
+
0.3.9 Jul 18, 2014:
o Include pemfile in the package data. Closes #5897.
o Look for bundled cacert.pem in the Resources dir for OSX.
diff --git a/README.rst b/README.rst
index 9164054..9fcf788 100644
--- a/README.rst
+++ b/README.rst
@@ -15,7 +15,6 @@ A collection of shared utils used by the several python LEAP subprojects.
Library dependencies
--------------------
-* ``protobuf-compiler``
* ``libssl-dev``
Python dependencies
diff --git a/pkg/generate_wheels.sh b/pkg/generate_wheels.sh
new file mode 100755
index 0000000..4cdc34e
--- /dev/null
+++ b/pkg/generate_wheels.sh
@@ -0,0 +1,14 @@
+#!/bin/sh
+# Generate wheels for dependencies
+# For convenience, dirspec is allowed with insecure flags enabled.
+# Use at your own risk.
+
+if [ "$WHEELHOUSE" = "" ]; then
+ WHEELHOUSE=$HOME/wheelhouse
+fi
+
+pip wheel --wheel-dir $WHEELHOUSE pip
+pip wheel --wheel-dir $WHEELHOUSE --allow-external dirspec --allow-unverified dirspec -r pkg/requirements.pip
+if [ -f pkg/requirements-testing.pip ]; then
+ pip wheel --wheel-dir $WHEELHOUSE -r pkg/requirements-testing.pip
+fi
diff --git a/pkg/pip_install_requirements.sh b/pkg/pip_install_requirements.sh
new file mode 100755
index 0000000..6d8ed28
--- /dev/null
+++ b/pkg/pip_install_requirements.sh
@@ -0,0 +1,86 @@
+#!/bin/bash
+# Update pip and install LEAP base/testing requirements.
+# For convenience, $insecure_packages are allowed with insecure flags enabled.
+# Use at your own risk.
+# See $usage for help
+
+insecure_packages="dirspec"
+leap_wheelhouse=https://lizard.leap.se/wheels
+
+show_help() {
+ usage="Usage: $0 [--testing] [--use-leap-wheels]\n --testing\t\tInstall dependencies from requirements-testing.pip\n
+\t\t\tOtherwise, it will install requirements.pip\n
+--use-leap-wheels\tUse wheels from leap.se"
+ echo -e $usage
+
+ exit 1
+}
+
+process_arguments() {
+ testing=false
+ use_leap_wheels=false
+
+ while [ "$#" -gt 0 ]; do
+ # From http://stackoverflow.com/a/31443098
+ case "$1" in
+ --help) show_help;;
+ --testing) testing=true; shift 1;;
+ --use-leap-wheels) use_leap_wheels=true; shift 1;;
+
+ -h) show_help;;
+ -*) echo "unknown option: $1" >&2; exit 1;;
+ esac
+ done
+}
+
+return_wheelhouse() {
+ if $use_leap_wheels ; then
+ WHEELHOUSE=$leap_wheelhouse
+ elif [ "$WHEELHOUSE" = "" ]; then
+ WHEELHOUSE=$HOME/wheelhouse
+ fi
+
+ # Tested with bash and zsh
+ if [[ $WHEELHOUSE != http* && ! -d "$WHEELHOUSE" ]]; then
+ mkdir $WHEELHOUSE
+ fi
+
+ echo "$WHEELHOUSE"
+}
+
+return_install_options() {
+ wheelhouse=`return_wheelhouse`
+ install_options="-U --find-links=$wheelhouse"
+ if $use_leap_wheels ; then
+ install_options="$install_options --trusted-host lizard.leap.se"
+ fi
+
+ echo $install_options
+}
+
+return_insecure_flags() {
+ for insecure_package in $insecure_packages; do
+ flags="$flags --allow-external $insecure_package --allow-unverified $insecure_package"
+ done
+
+ echo $flags
+}
+
+return_packages() {
+ if $testing ; then
+ packages="-r pkg/requirements-testing.pip"
+ else
+ packages="-r pkg/requirements.pip"
+ fi
+
+ echo $packages
+}
+
+process_arguments $@
+install_options=`return_install_options`
+insecure_flags=`return_insecure_flags`
+packages=`return_packages`
+
+pip install -U wheel
+pip install $install_options pip
+pip install $install_options $insecure_flags $packages
diff --git a/pkg/requirements-testing.pip b/pkg/requirements-testing.pip
new file mode 100644
index 0000000..c5a3ad0
--- /dev/null
+++ b/pkg/requirements-testing.pip
@@ -0,0 +1,3 @@
+mock
+setuptools-trial
+pep8
diff --git a/pkg/requirements.pip b/pkg/requirements.pip
index c89fd19..02fb189 100644
--- a/pkg/requirements.pip
+++ b/pkg/requirements.pip
@@ -1,8 +1,8 @@
jsonschema #<=0.8 -- are we done with this conflict?
dirspec
-protobuf>=2.4.1
-protobuf.socketrpc
pyopenssl
python-dateutil
+pyzmq>=14.4.1
+txzmq>=0.7.3
#autopep8 -- ???
diff --git a/pkg/tools/get_authors.sh b/pkg/tools/get_authors.sh
new file mode 100755
index 0000000..0169bb1
--- /dev/null
+++ b/pkg/tools/get_authors.sh
@@ -0,0 +1,2 @@
+#!/bin/sh
+git log --format='%aN <%aE>' | awk '{arr[$0]++} END{for (i in arr){print arr[i], i;}}' | sort -rn | cut -d' ' -f2-
diff --git a/pkg/utils.py b/pkg/utils.py
index deace14..521cd4e 100644
--- a/pkg/utils.py
+++ b/pkg/utils.py
@@ -58,9 +58,9 @@ def parse_requirements(reqfiles=['requirements.txt',
if re.match(r'\s*-e\s+', line):
pass
# do not try to do anything with externals on vcs
- #requirements.append(re.sub(r'\s*-e\s+.*#egg=(.*)$', r'\1',
- #line))
- # http://foo.bar/baz/foobar/zipball/master#egg=foobar
+ # requirements.append(re.sub(r'\s*-e\s+.*#egg=(.*)$', r'\1',
+ # line))
+ # http://foo.bar/baz/foobar/zipball/master#egg=foobar
elif re.match(r'\s*https?:', line):
requirements.append(re.sub(r'\s*https?:.*#egg=(.*)$', r'\1',
line))
diff --git a/setup.cfg b/setup.cfg
index 861a9f5..d929797 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -3,3 +3,13 @@ tag_build =
tag_date = 0
tag_svn_revision = 0
+[aliases]
+test = trial
+
+[pep8]
+exclude = versioneer.py,_version.py,*.egg,build,docs
+ignore = E731
+
+[flake8]
+exclude = versioneer.py,_version.py,*.egg,build,docs
+ignore = E731
diff --git a/setup.py b/setup.py
index a7de8f9..ca83017 100644
--- a/setup.py
+++ b/setup.py
@@ -138,4 +138,11 @@ setup(
tests_require=tests_requirements,
include_package_data=True,
zip_safe=False,
+
+ extras_require={
+ # needed for leap.common.http
+ # service_identity needed for propper hostname identification,
+ # see http://twistedmatrix.com/documents/current/core/howto/ssl.html
+ 'Twisted': ["Twisted>=14.0.2", "service_identity", "zope.interface"]
+ },
)
diff --git a/src/leap/common/__init__.py b/src/leap/common/__init__.py
index 5619900..383e198 100644
--- a/src/leap/common/__init__.py
+++ b/src/leap/common/__init__.py
@@ -4,6 +4,7 @@ from leap.common import certs
from leap.common import check
from leap.common import files
from leap.common import events
+from ._version import get_versions
logger = logging.getLogger(__name__)
@@ -11,11 +12,10 @@ try:
import pygeoip
HAS_GEOIP = True
except ImportError:
- #logger.debug('PyGeoIP not found. Disabled Geo support.')
+ # logger.debug('PyGeoIP not found. Disabled Geo support.')
HAS_GEOIP = False
__all__ = ["certs", "check", "files", "events"]
-from ._version import get_versions
__version__ = get_versions()['version']
del get_versions
diff --git a/src/leap/common/_version.py b/src/leap/common/_version.py
index e926143..3500492 100644
--- a/src/leap/common/_version.py
+++ b/src/leap/common/_version.py
@@ -1,13 +1,3 @@
-# This file was generated by the `freeze_debianver` command in setup.py
-# Using 'versioneer.py' (0.7+) from
-# revision-control system data, or from the parent directory name of an
-# unpacked source archive. Distribution tarballs contain a pre-generated copy
-# of this file.
-
version_version = '0.3.9'
version_full = '3de1eeba83d50793283d65ba4566dd1611ee9d4b'
-
-
-def get_versions(default={}, verbose=False):
- return {'version': version_version, 'full': version_full}
diff --git a/src/leap/common/ca_bundle.py b/src/leap/common/ca_bundle.py
index d8c72a6..2c41d18 100644
--- a/src/leap/common/ca_bundle.py
+++ b/src/leap/common/ca_bundle.py
@@ -28,6 +28,7 @@ _system = platform.system()
IS_MAC = _system == "Darwin"
+
def where():
"""
Return the preferred certificate bundle.
diff --git a/src/leap/common/certs.py b/src/leap/common/certs.py
index 4fe563b..37ede8e 100644
--- a/src/leap/common/certs.py
+++ b/src/leap/common/certs.py
@@ -128,22 +128,23 @@ def is_valid_pemfile(cert):
return can_load_cert_and_pkey(cert)
-def get_cert_time_boundaries(certfile):
+def get_cert_time_boundaries(certdata):
"""
- Returns the time boundaries for the certificate saved in certfile
+ Return the time boundaries for the given certificate.
+ The returned values are UTC/GMT time.struct_time objects
- :param certfile: path to certificate
- :type certfile: str
+ :param certdata: the certificate contents
+ :type certdata: str
:rtype: tuple (from, to)
"""
- cert = get_cert_from_string(certfile)
+ cert = get_cert_from_string(certdata)
leap_assert(cert, 'There was a problem loading the certificate')
fromts, tots = (cert.get_notBefore(), cert.get_notAfter())
- from_, to_ = map(
- lambda ts: time.gmtime(time.mktime(dateparse(ts).timetuple())),
- (fromts, tots))
+ from_ = dateparse(fromts).timetuple()
+ to_ = dateparse(tots).timetuple()
+
return from_, to_
@@ -177,3 +178,21 @@ def should_redownload(certfile, now=time.gmtime):
return True
return False
+
+
+def get_compatible_ssl_context_factory(cert_path=None):
+ import twisted
+ cert = None
+ if twisted.version.base() > '14.0.1':
+ from twisted.web.client import BrowserLikePolicyForHTTPS
+ from twisted.internet import ssl
+ if cert_path:
+ cert = ssl.Certificate.loadPEM(open(cert_path).read())
+ policy = BrowserLikePolicyForHTTPS(cert)
+ return policy
+ else:
+ raise Exception(("""
+ Twisted 14.0.2 is needed in order to have secure
+ Client Web SSL Contexts, not %s
+ See: http://twistedmatrix.com/trac/ticket/7647
+ """) % (twisted.version.base()))
diff --git a/src/leap/common/config/pluggableconfig.py b/src/leap/common/config/pluggableconfig.py
index 8535fa6..1a98427 100644
--- a/src/leap/common/config/pluggableconfig.py
+++ b/src/leap/common/config/pluggableconfig.py
@@ -27,7 +27,7 @@ import urlparse
import jsonschema
-#from leap.base.util.translations import LEAPTranslatable
+# from leap.base.util.translations import LEAPTranslatable
from leap.common.check import leap_assert
@@ -163,8 +163,8 @@ class TranslatableType(object):
return data # LEAPTranslatable(data)
# needed? we already have an extended dict...
- #def get_prep_value(self, data):
- #return dict(data)
+ # def get_prep_value(self, data):
+ # return dict(data)
class URIType(object):
@@ -283,9 +283,13 @@ class PluggableConfig(object):
except BaseException, e:
raise TypeCastException(
"Could not coerce %s, %s, "
- "to format %s: %s" % (key, value,
- _ftype.__class__.__name__,
- e))
+ "to format %s: %s" % (
+ key,
+ value,
+ _ftype.__class__.__name__,
+ e
+ )
+ )
return config
@@ -303,9 +307,12 @@ class PluggableConfig(object):
except BaseException, e:
raise TypeCastException(
"Could not serialize %s, %s, "
- "by format %s: %s" % (key, value,
- _ftype.__class__.__name__,
- e))
+ "by format %s: %s" % (
+ key,
+ value,
+ _ftype.__class__.__name__,
+ e)
+ )
else:
config[key] = value
return config
@@ -435,7 +442,7 @@ class PluggableConfig(object):
content = self.deserialize(string)
if not string and fromfile is not None:
- #import ipdb;ipdb.set_trace()
+ # import ipdb;ipdb.set_trace()
content = self.deserialize(fromfile=fromfile)
if not content:
diff --git a/src/leap/common/config/tests/test_baseconfig.py b/src/leap/common/config/tests/test_baseconfig.py
index 8bdf4d0..e17e82d 100644
--- a/src/leap/common/config/tests/test_baseconfig.py
+++ b/src/leap/common/config/tests/test_baseconfig.py
@@ -29,21 +29,21 @@ from mock import Mock
# reduced eipconfig sample config
sample_config = {
"gateways": [
- {
- "capabilities": {
- "adblock": False,
- "transport": ["openvpn"],
- "user_ips": False
- },
- "host": "host.dev.example.org",
- }, {
- "capabilities": {
- "adblock": False,
- "transport": ["openvpn"],
- "user_ips": False
- },
- "host": "host2.dev.example.org",
- }
+ {
+ "capabilities": {
+ "adblock": False,
+ "transport": ["openvpn"],
+ "user_ips": False
+ },
+ "host": "host.dev.example.org",
+ }, {
+ "capabilities": {
+ "adblock": False,
+ "transport": ["openvpn"],
+ "user_ips": False
+ },
+ "host": "host2.dev.example.org",
+ }
],
"default_language": "en",
"languages": [
diff --git a/src/leap/common/decorators.py b/src/leap/common/decorators.py
index 2ef6711..4b07ea9 100644
--- a/src/leap/common/decorators.py
+++ b/src/leap/common/decorators.py
@@ -22,6 +22,13 @@ import datetime
import functools
import logging
+try:
+ from twisted.internet import defer
+except ImportError:
+ class defer:
+ class Deferred:
+ pass
+
logger = logging.getLogger(__name__)
@@ -59,6 +66,7 @@ class _memoized(object):
# consume a huge amount of memory.
self.cache = {}
self.cache_ts = {}
+ self.is_deferred = {}
def __call__(self, *args, **kwargs):
"""
@@ -67,28 +75,7 @@ class _memoized(object):
:tyoe args: tuple
:type kwargs: dict
"""
- def ret_or_raise(value):
- """
- Returns the value except if it is an exception,
- in which case it's raised.
- """
- if isinstance(value, Exception):
- raise value
- return value
-
- if self.is_method:
- # forget about `self` as key
- key_args = args[1:]
- else:
- key_args = args
-
- if self.ignore_kwargs is True:
- key = key_args
- else:
- key = (key_args, frozenset(
- [(k, v) for k, v in kwargs.items()
- if k not in self.ignore_kwargs]))
-
+ key = self._build_key(*args, **kwargs)
if not isinstance(key, collections.Hashable):
# uncacheable. a list, for instance.
# better to not cache than blow up.
@@ -97,9 +84,9 @@ class _memoized(object):
if key in self.cache:
if self._is_cache_still_valid(key):
- value = self.cache[key]
logger.debug("Got value from cache...")
- return ret_or_raise(value)
+ value = self._get_value(key)
+ return self._ret_or_raise(value)
else:
logger.debug("Cache is invalid, evaluating again...")
@@ -109,9 +96,52 @@ class _memoized(object):
except Exception as exc:
logger.error("Exception while calling function: %r" % (exc,))
value = exc
- self.cache[key] = value
+
+ if isinstance(value, defer.Deferred):
+ value.addBoth(self._store_deferred_value(key))
+ else:
+ self.cache[key] = value
+ self.is_deferred[key] = False
self.cache_ts[key] = datetime.datetime.now()
- return ret_or_raise(value)
+ return self._ret_or_raise(value)
+
+ def _build_key(self, *args, **kwargs):
+ """
+ Build key from the function arguments
+ """
+ if self.is_method:
+ # forget about `self` as key
+ key_args = args[1:]
+ else:
+ key_args = args
+
+ if self.ignore_kwargs is True:
+ key = key_args
+ else:
+ key = (key_args, frozenset(
+ [(k, v) for k, v in kwargs.items()
+ if k not in self.ignore_kwargs]))
+ return key
+
+ def _get_value(self, key):
+ """
+ Get a value form cache for a key
+ """
+ if self.is_deferred[key]:
+ value = self.cache[key]
+ # it produces an errback if value is Failure
+ return defer.succeed(value)
+ else:
+ return self.cache[key]
+
+ def _ret_or_raise(self, value):
+ """
+ Returns the value except if it is an exception,
+ in which case it's raised.
+ """
+ if isinstance(value, Exception):
+ raise value
+ return value
def _is_cache_still_valid(self, key, now=datetime.datetime.now):
"""
@@ -131,6 +161,16 @@ class _memoized(object):
delta = datetime.timedelta(seconds=self.CACHE_INVALIDATION_DELTA)
return (now() - cached_ts) < delta
+ def _store_deferred_value(self, key):
+ """
+ Returns a callback to store values from deferreds
+ """
+ def callback(value):
+ self.cache[key] = value
+ self.is_deferred[key] = True
+ return value
+ return callback
+
def __repr__(self):
"""
Return the function's docstring.
@@ -144,17 +184,20 @@ class _memoized(object):
return functools.partial(self.__call__, obj)
-def memoized_method(function=None, ignore_kwargs=None):
+def memoized_method(function=None, ignore_kwargs=None,
+ invalidation=_memoized.CACHE_INVALIDATION_DELTA):
"""
Wrap _memoized to allow for deferred calling
:type function: callable, or None.
:type ignore_kwargs: None, True or tuple.
+ :type invalidation: int seconds.
"""
if function:
- return _memoized(function, is_method=True)
+ return _memoized(function, is_method=True, invalidation=invalidation)
else:
def wrapper(function):
return _memoized(
- function, ignore_kwargs=ignore_kwargs, is_method=True)
+ function, ignore_kwargs=ignore_kwargs, is_method=True,
+ invalidation=invalidation)
return wrapper
diff --git a/src/leap/common/events/README.rst b/src/leap/common/events/README.rst
index 2e7f254..f455cc8 100644
--- a/src/leap/common/events/README.rst
+++ b/src/leap/common/events/README.rst
@@ -1,19 +1,83 @@
Events mechanism
================
-The events mechanism allows for clients to send signal events to each
-other by means of a centralized server. Clients can register with the
-server to receive signals of certain types, and they can also send signals to
-the server that will then redistribute these signals to registered clients.
+The events mechanism allows for clients to send events to each other by means
+of a centralized server. Clients can register with the server to receive
+events of certain types, and they can also send events to the server that will
+then redistribute these events to registered clients.
-Listening daemons
------------------
+ZMQ connections and events redistribution
+-----------------------------------------
-Both clients and the server listen for incoming messages by using a
-listening daemon that runs in its own thread. The server daemon has to be
-started explicitly, while clients daemon will be started whenever a
-client registers with the server to receive messages.
+Clients and server use ZMQ connection patterns to communicate. Clients can
+push events to the server, and may subscribe to events published by the
+server. The server in turn pulls events from clients and publishes them to
+subscribed clients.
+
+Clients connect to the server's zmq pub socket, and register to specific
+events indicating which callbacks should be executed when that event is
+received:
+
+
+ EventsServer
+ .------------.
+ |PULL PUB|
+ '------------'
+ ^^
+ ||
+ reg(1, cbk1) |'--------------. reg(2, cbk2)
+ | |
+ | |
+ .------------. .------------. .------------.
+ |PUSH SUB| |PUSH SUB| |PUSH SUB|
+ '------------' '------------' '------------'
+ EventsClient EventsClient EventsClient
+
+
+A client that wants to send an event connects to the server's zmq pull socket
+and pushes the event to the server. The server then redistributes it to all
+clients subscribed to that event.
+
+
+ EventsServer
+ .------------.
+ |PULL---->PUB|
+ '------------'
+ ^ |.
+ | |.
+sig(1, 'foo') .----------------' |'...............
+ | | .
+ | v .
+ .------------. .------------. .------------.
+ |PUSH SUB| |PUSH SUB| |PUSH SUB|
+ '------------' '------------' '------------'
+ EventsClient EventsClient EventsClient
+ |
+ v
+ cbk1(1, 'foo')
+
+
+Any client may emit or subscribe to an event. ZMQ will manage sockets and
+reuse the connection whenever it can.
+
+
+ EventsServer
+ .------------.
+ |PULL---->PUB|
+ '------------'
+ ^ .|
+ | .|
+sig(2, 'bar') .-----------------' .'--------------.
+ | . |
+ | . v
+ .------------. .------------. .------------.
+ |PUSH SUB| |PUSH SUB| |PUSH SUB|
+ '------------' '------------' '------------'
+ EventsClient EventsClient EventsClient
+ |
+ v
+ cbk2(2, 'bar')
How to use it
@@ -22,32 +86,27 @@ How to use it
To start the events server:
>>> from leap.common.events import server
->>> server.ensure_server(port=8090)
+>>> server.ensure_server(
+ emit_addr="tcp://127.0.0.1:9000",
+ reg_addr="tcp://127.0.0.1:9001")
-To register a callback to be called when a given signal is raised:
+To register a callback to be called when a given event is raised:
->>> from leap.common.events import (
->>> register,
->>> events_pb2 as proto,
->>> )
+>>> from leap.common.events import register
+>>> from leap.common.events import catalog
>>>
->>> def mycallback(sigreq):
->>> print str(sigreq)
+>>> def mycbk(event, *content):
+>>> print "%s, %s" (str(event), str(content))
>>>
->>> events.register(signal=proto.CLIENT_UID, callback=mycallback)
+>>> register(catalog.CLIENT_UID, callback=mycbk)
-To signal an event:
+To emit an event:
->>> from leap.common.events import (
->>> signal,
->>> events_pb2 as proto,
->>> )
->>> signal(proto.CLIENT_UID)
+>>> from leap.common.events import emit
+>>> from leap.common.events import catalog
+>>> emit(catalog.CLIENT_UID)
Adding events
-------------
-* Add the new event under enum ``Event`` in ``events.proto``
-* Compile the new protocolbuffers file::
-
- make
+To add a new event, just add it to ``catalog.py``.
diff --git a/src/leap/common/events/__init__.py b/src/leap/common/events/__init__.py
index 0cc6573..87ed8ae 100644
--- a/src/leap/common/events/__init__.py
+++ b/src/leap/common/events/__init__.py
@@ -14,189 +14,199 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
"""
-This is an events mechanism that uses a server to allow for signaling of
-events between clients.
+This is an events mechanism that uses a server to allow for emitting events
+between clients.
Application components should use the interface available in this file to
-register callbacks to be executed upon receiving specific signals, and to send
-signals to other components.
+register callbacks to be executed upon receiving specific events, and to send
+events to other components.
-To register a callback to be executed when a specific event is signaled, use
+To register a callback to be executed when a specific event is emitted, use
leap.common.events.register():
>>> from leap.common.events import register
->>> from leap.common.events import events_pb2 as proto
->>> register(proto.CLIENT_UID, lambda req: do_something(req))
-
-To signal an event, use leap.common.events.signal():
-
->>> from leap.common.events import signal
->>> from leap.common.events import events_pb2 as proto
->>> signal(proto.CLIENT_UID)
-
-
-NOTE ABOUT SYNC/ASYNC REQUESTS:
-
-Clients always communicate with the server, and never between themselves.
-When a client registers a callback for an event, the callback is saved locally
-in the client and the server stores the client socket port in a list
-associated with that event. When a client signals an event, the server
-forwards the signal to all registered client ports, and then each client
-executes its callbacks associated with that event locally.
-
-Each RPC call from a client to the server is followed by a response from the
-server to the client. Calls to register() and signal() (and all other RPC
-calls) can be synchronous or asynchronous meaning if they will or not wait for
-the server's response before returning.
-
-This mechanism was built on top of protobuf.socketrpc, and because of this RPC
-calls are made synchronous or asynchronous in the following way:
-
- * If RPC calls receive a parameter called `reqcbk`, then the call is made
- asynchronous. That means that:
-
- - an eventual `timeout` parameter is not used,
- - the call returns immediatelly with value None, and
- - the `reqcbk` callback is executed asynchronously upon the arrival of
- a response from the server.
+>>> from leap.common.events import catalog
+>>> register(catalog.CLIENT_UID, lambda sig, content: do_something(content))
- * Otherwise, if the `reqcbk` parameter is None, then the call is made in a
- synchronous manner:
+To emit an event, use leap.common.events.emit():
- - if a response from server arrives within `timeout` milliseconds, the
- RPC call returns it;
- - otherwise, the call returns None.
+>>> from leap.common.events import emit
+>>> from leap.common.events import catalog
+>>> emit(catalog.CLIENT_UID)
"""
-
import logging
-import socket
+import argparse
+
+from leap.common.events import client
+from leap.common.events import server
+from leap.common.events.flags import set_events_enabled
+
+from leap.common.events import catalog
-from leap.common.events import (
- events_pb2 as proto,
- server,
- client,
- daemon,
-)
+__all__ = [
+ "register",
+ "unregister",
+ "emit",
+ "catalog",
+ "set_events_enabled"
+]
logger = logging.getLogger(__name__)
-def register(signal, callback, uid=None, replace=False, reqcbk=None,
- timeout=1000):
+def register(event, callback, uid=None, replace=False):
"""
- Register a callback to be called when the given signal is received.
-
- Will timeout after timeout ms if response has not been received. The
- timeout arg is only used for asynch requests. If a reqcbk callback has
- been supplied the timeout arg is not used. The response value will be
- returned for a synch request but nothing will be returned for an asynch
- request.
-
- :param signal: the signal that causes the callback to be launched
- :type signal: int (see the `events.proto` file)
- :param callback: the callback to be called when the signal is received
- :type callback: function
- :param uid: a unique id for the callback
- :type uid: int
- :param replace: should an existent callback with same uid be replaced?
+ Register a callback to be executed when an event is received.
+
+ :param event: The event that triggers the callback.
+ :type event: str
+ :param callback: The callback to be executed.
+ :type callback: callable(event, content)
+ :param uid: The callback uid.
+ :type uid: str
+ :param replace: Wether an eventual callback with same ID should be
+ replaced.
:type replace: bool
- :param reqcbk: a callback to be called when a response from server is
- received
- :type reqcbk: function(leap.common.events.events_pb2.EventResponse)
- :param timeout: the timeout for synch calls
- :type timeout: int
-
- :return: the response from server for synch calls or nothing for asynch
- calls.
- :rtype: leap.common.events.events_pb2.EventsResponse or None
- """
- return client.register(signal, callback, uid, replace, reqcbk, timeout)
+ :return: The callback uid.
+ :rtype: str
-def unregister(signal, uid=None, reqcbk=None, timeout=1000):
- """
- Unregister a callback.
-
- If C{uid} is specified, unregisters only the callback identified by that
- unique id. Otherwise, unregisters all callbacks registered for C{signal}.
-
- :param signal: the signal that causes the callback to be launched
- :type signal: int (see the `events.proto` file)
- :param uid: a unique id for the callback
- :type uid: int
- :param reqcbk: a callback to be called when a response from server is
- received
- :type reqcbk: function(proto.UnregisterRequest, proto.EventResponse)
- :param timeout: the timeout for synch calls
- :type timeout: int
-
- :return: the response from server for synch calls or nothing for asynch
- calls.
- :rtype: leap.common.events.events_pb2.EventsResponse or None
+ :raises CallbackAlreadyRegistered: when there's already a callback
+ identified by the given uid and replace is False.
"""
- return client.unregister(signal, uid, reqcbk, timeout)
+ return client.register(event, callback, uid, replace)
-def signal(signal, content="", mac_method="", mac="", reqcbk=None,
- timeout=1000):
+def unregister(event, uid=None):
"""
- Send `signal` event to events server.
-
- Will timeout after timeout ms if response has not been received. The
- timeout arg is only used for asynch requests. If a reqcbk callback has
- been supplied the timeout arg is not used. The response value will be
- returned for a synch request but nothing will be returned for an asynch
- request.
-
- :param signal: the signal that causes the callback to be launched
- :type signal: int (see the `events.proto` file)
- :param content: the contents of the event signal
- :type content: str
- :param mac_method: the method used to auth mac
- :type mac_method: str
- :param mac: the content of the auth mac
- :type mac: str
- :param reqcbk: a callback to be called when a response from server is
- received
- :type reqcbk: function(proto.SignalRequest, proto.EventResponse)
- :param timeout: the timeout for synch calls
- :type timeout: int
-
- :return: the response from server for synch calls or nothing for asynch
- calls.
- :rtype: leap.common.events.events_pb2.EventsResponse or None
- """
- return client.signal(signal, content, mac_method, mac, reqcbk, timeout)
+ Unregister callbacks for an event.
-def ping_client(port, reqcbk=None, timeout=1000):
- """
- Ping a client running in C{port}.
-
- :param port: the port in which the client should be listening
- :type port: int
- :param reqcbk: a callback to be called when a response from client is
- received
- :type reqcbk: function(proto.PingRequest, proto.EventResponse)
- :param timeout: the timeout for synch calls
- :type timeout: int
+ If uid is not None, then only the callback identified by the given uid is
+ removed. Otherwise, all callbacks for the event are removed.
+
+ :param event: The event that triggers the callback.
+ :type event: Event
+ :param uid: The callback uid.
+ :type uid: str
"""
- return client.ping(port, reqcbk=reqcbk, timeout=timeout)
+ return client.unregister(event, uid)
-def ping_server(port=server.SERVER_PORT, reqcbk=None, timeout=1000):
+def emit(event, *content):
"""
- Ping the server.
-
- :param port: the port in which server should be listening
- :type port: int
- :param reqcbk: a callback to be called when a response from server is
- received
- :type reqcbk: function(proto.PingRequest, proto.EventResponse)
- :param timeout: the timeout for synch calls
- :type timeout: int
+ Send an event.
+
+ :param event: The event to be sent.
+ :type event: Event
+ :param content: The content of the event.
+ :type content: list
"""
- return server.ping(port, reqcbk=reqcbk, timeout=timeout)
+ return client.emit(event, *content)
+
+
+if __name__ == "__main__":
+
+ def _echo(event, *content):
+ print "Received event: (%s, %s)" % (event, content)
+
+ def _parse_args():
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "--debug", "-d", action="store_true",
+ help="print debug information")
+
+ subparsers = parser.add_subparsers(dest="command")
+
+ # server options
+ server_parser = subparsers.add_parser(
+ "server", help="Run an events server.")
+ server_parser.add_argument(
+ "--emit-addr",
+ help="The address in which to listen for events",
+ default=server.EMIT_ADDR)
+ server_parser.add_argument(
+ "--reg-addr",
+ help="The address in which to listen for registration for events.",
+ default=server.REG_ADDR)
+
+ # client options
+ client_parser = subparsers.add_parser(
+ "client", help="Run an events client.")
+ client_parser.add_argument(
+ "--emit-addr",
+ help="The address in which to emit events.",
+ default=server.EMIT_ADDR)
+ client_parser.add_argument(
+ "--reg-addr",
+ help="The address in which to register for events.",
+ default=server.REG_ADDR)
+ group = client_parser.add_mutually_exclusive_group(required=True)
+ group.add_argument('--reg', help="register an event")
+ group.add_argument('--emit', help="send an event")
+ client_parser.add_argument(
+ '--content', help="the content of the event", default=None)
+
+ # txclient options
+ txclient_parser = subparsers.add_parser(
+ "txclient", help="Run an events twisted client.")
+ txclient_parser.add_argument(
+ "--emit-addr",
+ help="The address in which to emit events.",
+ default=server.EMIT_ADDR)
+ txclient_parser.add_argument(
+ "--reg-addr",
+ help="The address in which to register for events.",
+ default=server.REG_ADDR)
+ group = txclient_parser.add_mutually_exclusive_group(required=True)
+ group.add_argument('--reg', help="register an event")
+ group.add_argument('--emit', help="send an event")
+ txclient_parser.add_argument(
+ '--content', help="the content of the event", default=None)
+
+ return parser.parse_args()
+
+ args = _parse_args()
+
+ if args.debug:
+ logging.basicConfig(level=logging.DEBUG)
+
+ if args.command == "server":
+ # run server
+ server.ensure_server(emit_addr=args.emit_addr, reg_addr=args.reg_addr)
+ from twisted.internet import reactor
+ reactor.run()
+ elif args.command == "client":
+ if args.reg:
+ event = getattr(catalog, args.reg)
+ # run client and register to a signal
+ register(event, _echo)
+ # make sure we stop on CTRL+C
+ import signal
+ signal.signal(
+ signal.SIGINT, lambda sig, frame: client.shutdown())
+ # wait until client thread dies
+ import time
+ while client.EventsClientThread.instance().is_alive():
+ time.sleep(0.1)
+ if args.emit:
+ # run client and emit a signal
+ event = getattr(catalog, args.emit)
+ emit(event, args.content)
+ client.shutdown()
+ elif args.command == "txclient":
+ from leap.common.events import txclient
+ register = txclient.register
+ emit = txclient.emit
+ if args.reg:
+ event = getattr(catalog, args.reg)
+ # run client and register to a signal
+ register(event, _echo)
+ from twisted.internet import reactor
+ reactor.run()
+ if args.emit:
+ # run client and emit a signal
+ event = getattr(catalog, args.emit)
+ emit(event, args.content)
diff --git a/src/leap/common/events/catalog.py b/src/leap/common/events/catalog.py
new file mode 100644
index 0000000..8bddd2c
--- /dev/null
+++ b/src/leap/common/events/catalog.py
@@ -0,0 +1,88 @@
+# -*- coding: utf-8 -*-
+# catalog.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful",
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+Events catalog.
+"""
+
+
+EVENTS = [
+ "CLIENT_SESSION_ID",
+ "CLIENT_UID",
+ "IMAP_CLIENT_LOGIN",
+ "IMAP_SERVICE_FAILED_TO_START",
+ "IMAP_SERVICE_STARTED",
+ "IMAP_UNHANDLED_ERROR",
+ "KEYMANAGER_DONE_UPLOADING_KEYS",
+ "KEYMANAGER_FINISHED_KEY_GENERATION",
+ "KEYMANAGER_KEY_FOUND",
+ "KEYMANAGER_KEY_NOT_FOUND",
+ "KEYMANAGER_LOOKING_FOR_KEY",
+ "KEYMANAGER_STARTED_KEY_GENERATION",
+ "MAIL_FETCHED_INCOMING",
+ "MAIL_MSG_DECRYPTED",
+ "MAIL_MSG_DELETED_INCOMING",
+ "MAIL_MSG_PROCESSING",
+ "MAIL_MSG_SAVED_LOCALLY",
+ "MAIL_UNREAD_MESSAGES",
+ "RAISE_WINDOW",
+ "SMTP_CONNECTION_LOST",
+ "SMTP_END_ENCRYPT_AND_SIGN",
+ "SMTP_END_SIGN",
+ "SMTP_RECIPIENT_ACCEPTED_ENCRYPTED",
+ "SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED",
+ "SMTP_RECIPIENT_REJECTED",
+ "SMTP_SEND_MESSAGE_ERROR",
+ "SMTP_SEND_MESSAGE_START",
+ "SMTP_SEND_MESSAGE_SUCCESS",
+ "SMTP_SERVICE_FAILED_TO_START",
+ "SMTP_SERVICE_STARTED",
+ "SMTP_START_ENCRYPT_AND_SIGN",
+ "SMTP_START_SIGN",
+ "SOLEDAD_CREATING_KEYS",
+ "SOLEDAD_DONE_CREATING_KEYS",
+ "SOLEDAD_DONE_DATA_SYNC",
+ "SOLEDAD_DONE_DOWNLOADING_KEYS",
+ "SOLEDAD_DONE_UPLOADING_KEYS",
+ "SOLEDAD_DOWNLOADING_KEYS",
+ "SOLEDAD_INVALID_AUTH_TOKEN",
+ "SOLEDAD_NEW_DATA_TO_SYNC",
+ "SOLEDAD_SYNC_RECEIVE_STATUS",
+ "SOLEDAD_SYNC_SEND_STATUS",
+ "SOLEDAD_UPLOADING_KEYS",
+ "UPDATER_DONE_UPDATING",
+ "UPDATER_NEW_UPDATES",
+]
+
+
+class Event(object):
+
+ def __init__(self, label):
+ self.label = label
+
+ def __repr__(self):
+ return '<Event: %s>' % self.label
+
+ def __str__(self):
+ return self.label
+
+
+# create local variables based on the event list above
+lcl = locals()
+for event in EVENTS:
+ lcl[event] = Event(event)
diff --git a/src/leap/common/events/client.py b/src/leap/common/events/client.py
index 83f18e0..e085f5b 100644
--- a/src/leap/common/events/client.py
+++ b/src/leap/common/events/client.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# client.py
-# Copyright (C) 2013 LEAP
+# Copyright (C) 2013, 2014, 2015 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -21,309 +21,532 @@ Clients are the communicating parties of the events mechanism. They
communicate by sending messages to a server, which in turn redistributes
messages to other clients.
-When a client registers a callback for a given signal, it also tells the
-server that it wants to be notified whenever signals of that type are sent by
+When a client registers a callback for a given event, it also tells the
+server that it wants to be notified whenever events of that type are sent by
some other client.
"""
-
import logging
+import collections
+import uuid
+import threading
+import time
+import pickle
+import os
+
+from abc import ABCMeta
+from abc import abstractmethod
+
+import zmq
+from zmq.eventloop import zmqstream
+from zmq.eventloop import ioloop
+
+# XXX some distros don't package libsodium, so we have to be prepared for
+# absence of zmq.auth
+try:
+ import zmq.auth
+except ImportError:
+ pass
+from leap.common.config import get_path_prefix
+from leap.common.zmq_utils import zmq_has_curve
+from leap.common.zmq_utils import maybe_create_and_get_certificates
+from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX
-from protobuf.socketrpc import RpcService
-
-from leap.common.events import events_pb2 as proto
-from leap.common.events import server
-from leap.common.events import daemon
-from leap.common.events import mac_auth
+from leap.common.events.errors import CallbackAlreadyRegisteredError
+from leap.common.events.server import EMIT_ADDR
+from leap.common.events.server import REG_ADDR
+from leap.common.events import flags
+from leap.common.events import catalog
logger = logging.getLogger(__name__)
-# the `registered_callbacks` dictionary below should have the following
-# format:
-#
-# { event_signal: [ (uid, callback), ... ], ... }
-#
-registered_callbacks = {}
+_emit_addr = EMIT_ADDR
+_reg_addr = REG_ADDR
+
+def configure_client(emit_addr, reg_addr):
+ global _emit_addr, _reg_addr
+ logger.debug("Configuring client with addresses: (%s, %s)" %
+ (emit_addr, reg_addr))
+ _emit_addr = emit_addr
+ _reg_addr = reg_addr
-class CallbackAlreadyRegistered(Exception):
+
+class EventsClient(object):
"""
- Raised when trying to register an already registered callback.
+ A singleton client for the events mechanism.
"""
- pass
+ __metaclass__ = ABCMeta
-def ensure_client_daemon():
- """
- Ensure the client daemon is running and listening for incoming
- messages.
+ _instance = None
+ _instance_lock = threading.Lock()
- :return: the daemon instance
- :rtype: EventsClientDaemon
- """
- import time
- daemon = EventsClientDaemon.ensure(0)
- logger.debug('ensure client daemon')
+ def __init__(self, emit_addr, reg_addr):
+ """
+ Initialize the events client.
+ """
+ logger.debug("Creating client instance.")
+ self._callbacks = collections.defaultdict(dict)
+ self._emit_addr = emit_addr
+ self._reg_addr = reg_addr
- # Because we use a random port we want to wait until a port is assigned to
- # local client daemon.
+ @property
+ def callbacks(self):
+ return self._callbacks
- while not (EventsClientDaemon.get_instance() and
- EventsClientDaemon.get_instance().get_port()):
- time.sleep(0.1)
- return daemon
+ @classmethod
+ def instance(cls):
+ """
+ Return a singleton EventsClient instance.
+ """
+ with cls._instance_lock:
+ if cls._instance is None:
+ cls._instance = cls(_emit_addr, _reg_addr)
+ return cls._instance
+ def register(self, event, callback, uid=None, replace=False):
+ """
+ Register a callback to be executed when an event is received.
+
+ :param event: The event that triggers the callback.
+ :type event: Event
+ :param callback: The callback to be executed.
+ :type callback: callable(event, *content)
+ :param uid: The callback uid.
+ :type uid: str
+ :param replace: Wether an eventual callback with same ID should be
+ replaced.
+ :type replace: bool
+
+ :return: The callback uid.
+ :rtype: str
+
+ :raises CallbackAlreadyRegisteredError: when there's already a callback
+ identified by the given uid and replace is False.
+ """
+ logger.debug("Subscribing to event: %s" % event)
+ if not uid:
+ uid = uuid.uuid4()
+ elif uid in self._callbacks[event] and not replace:
+ raise CallbackAlreadyRegisteredError()
+ self._callbacks[event][uid] = callback
+ self._subscribe(str(event))
+ return uid
+
+ def unregister(self, event, uid=None):
+ """
+ Unregister callbacks for an event.
-def register(signal, callback, uid=None, replace=False, reqcbk=None,
- timeout=1000):
- """
- Registers a callback to be called when a specific signal event is
- received.
-
- Will timeout after timeout ms if response has not been received. The
- timeout arg is only used for asynch requests. If a reqcbk callback has
- been supplied the timeout arg is not used. The response value will be
- returned for a synch request but nothing will be returned for an asynch
- request.
-
- :param signal: the signal that causes the callback to be launched
- :type signal: int (see the `events.proto` file)
- :param callback: the callback to be called when the signal is received
- :type callback: function(leap.common.events.events_pb2.SignalRequest)
- :param uid: a unique id for the callback
- :type uid: int
- :param replace: should an existent callback with same uid be replaced?
- :type replace: bool
- :param reqcbk: a callback to be called when a response from server is
- received
- :type reqcbk: function(proto.RegisterRequest, proto.EventResponse)
- :param timeout: the timeout for synch calls
- :type timeout: int
-
- Might raise a CallbackAlreadyRegistered exception if there's already a
- callback identified by the given uid and replace is False.
-
- :return: the response from server for synch calls or nothing for asynch
- calls.
- :rtype: leap.common.events.events_pb2.EventsResponse or None
- """
- ensure_client_daemon() # so we can receive registered signals
- # register callback locally
- if signal not in registered_callbacks:
- registered_callbacks[signal] = []
- cbklist = registered_callbacks[signal]
-
- # TODO should check that the callback has the right
- # number of arguments.
-
- if uid and filter(lambda (x, y): x == uid, cbklist):
- if not replace:
- raise CallbackAlreadyRegistered()
+ If uid is not None, then only the callback identified by the given uid
+ is removed. Otherwise, all callbacks for the event are removed.
+
+ :param event: The event that triggers the callback.
+ :type event: Event
+ :param uid: The callback uid.
+ :type uid: str
+ """
+ if not uid:
+ logger.debug(
+ "Unregistering all callbacks from event %s." % event)
+ self._callbacks[event] = {}
else:
- registered_callbacks[signal] = filter(lambda(x, y): x != uid,
- cbklist)
- registered_callbacks[signal].append((uid, callback))
- # register callback on server
- request = proto.RegisterRequest()
- request.event = signal
- request.port = EventsClientDaemon.get_instance().get_port()
- request.mac_method = mac_auth.MacMethod.MAC_NONE
- request.mac = ""
- service = RpcService(proto.EventsServerService_Stub,
- server.SERVER_PORT, 'localhost')
- logger.debug(
- "Sending registration request to server on port %s: %s",
- server.SERVER_PORT,
- str(request)[:40])
- return service.register(request, callback=reqcbk, timeout=timeout)
-
-
-def unregister(signal, uid=None, reqcbk=None, timeout=1000):
- """
- Unregister a callback.
-
- If C{uid} is specified, unregisters only the callback identified by that
- unique id. Otherwise, unregisters all callbacks
-
- :param signal: the signal that causes the callback to be launched
- :type signal: int (see the `events.proto` file)
- :param uid: a unique id for the callback
- :type uid: int
- :param reqcbk: a callback to be called when a response from server is
- received
- :type reqcbk: function(proto.UnregisterRequest, proto.EventResponse)
- :param timeout: the timeout for synch calls
- :type timeout: int
-
- :return: the response from server for synch calls or nothing for asynch
- calls or None if no callback is registered for that signal or
- uid.
- :rtype: leap.common.events.events_pb2.EventsResponse or None
- """
- if signal not in registered_callbacks or not registered_callbacks[signal]:
- logger.warning("No callback registered for signal %d." % signal)
- return None
- # unregister callback locally
- cbklist = registered_callbacks[signal]
- if uid is not None:
- if filter(lambda (cbkuid, _): cbkuid == uid, cbklist) == []:
- logger.warning("No callback registered for uid %d." % st)
- return None
- registered_callbacks[signal] = filter(lambda(x, y): x != uid, cbklist)
- else:
- # exclude all callbacks for given signal
- registered_callbacks[signal] = []
- # unregister port in server if there are no more callbacks for this signal
- if not registered_callbacks[signal]:
- request = proto.UnregisterRequest()
- request.event = signal
- request.port = EventsClientDaemon.get_instance().get_port()
- request.mac_method = mac_auth.MacMethod.MAC_NONE
- request.mac = ""
- service = RpcService(proto.EventsServerService_Stub,
- server.SERVER_PORT, 'localhost')
- logger.info(
- "Sending unregistration request to server on port %s: %s",
- server.SERVER_PORT,
- str(request)[:40])
- return service.unregister(request, callback=reqcbk, timeout=timeout)
-
-
-def signal(signal, content="", mac_method="", mac="", reqcbk=None,
- timeout=1000):
- """
- Send `signal` event to events server.
-
- Will timeout after timeout ms if response has not been received. The
- timeout arg is only used for asynch requests. If a reqcbk callback has
- been supplied the timeout arg is not used. The response value will be
- returned for a synch request but nothing will be returned for an asynch
- request.
-
- :param signal: the signal that causes the callback to be launched
- :type signal: int (see the `events.proto` file)
- :param content: the contents of the event signal
- :type content: str
- :param mac_method: the method used for auth mac
- :type mac_method: str
- :param mac: the content of the auth mac
- :type mac: str
- :param reqcbk: a callback to be called when a response from server is
- received
- :type reqcbk: function(proto.SignalRequest, proto.EventResponse)
- :param timeout: the timeout for synch calls
- :type timeout: int
-
- :return: the response from server for synch calls or nothing for asynch
- calls.
- :rtype: leap.common.events.events_pb2.EventsResponse or None
- """
- request = proto.SignalRequest()
- request.event = signal
- request.content = content
- request.mac_method = mac_method
- request.mac = mac
- service = RpcService(proto.EventsServerService_Stub, server.SERVER_PORT,
- 'localhost')
- logger.debug("Sending signal to server: %s", str(request)[:40])
- return service.signal(request, callback=reqcbk, timeout=timeout)
-
-
-def ping(port, reqcbk=None, timeout=1000):
+ logger.debug(
+ "Unregistering callback %s from event %s." % (uid, event))
+ if uid in self._callbacks[event]:
+ del self._callbacks[event][uid]
+ if not self._callbacks[event]:
+ del self._callbacks[event]
+ self._unsubscribe(str(event))
+
+ def emit(self, event, *content):
+ """
+ Send an event.
+
+ :param event: The event to be sent.
+ :type event: Event
+ :param content: The content of the event.
+ :type content: list
+ """
+ logger.debug("Emitting event: (%s, %s)" % (event, content))
+ payload = str(event) + b'\0' + pickle.dumps(content)
+ self._send(payload)
+
+ def _handle_event(self, event, content):
+ """
+ Handle an incoming event.
+
+ :param event: The event to be sent.
+ :type event: Event
+ :param content: The content of the event.
+ :type content: list
+ """
+ logger.debug("Handling event %s..." % event)
+ for uid in self._callbacks[event]:
+ callback = self._callbacks[event][uid]
+ logger.debug("Executing callback %s." % uid)
+ self._run_callback(callback, event, content)
+
+ @abstractmethod
+ def _run_callback(self, callback, event, content):
+ """
+ Run a callback.
+
+ :param callback: The callback to be run.
+ :type callback: callable(event, *content)
+ :param event: The event to be sent.
+ :type event: Event
+ :param content: The content of the event.
+ :type content: list
+ """
+ pass
+
+ @abstractmethod
+ def _subscribe(self, tag):
+ """
+ Subscribe to a tag on the zmq SUB socket.
+
+ :param tag: The tag to be subscribed.
+ :type tag: str
+ """
+ pass
+
+ @abstractmethod
+ def _unsubscribe(self, tag):
+ """
+ Unsubscribe from a tag on the zmq SUB socket.
+
+ :param tag: The tag to be unsubscribed.
+ :type tag: str
+ """
+ pass
+
+ @abstractmethod
+ def _send(self, data):
+ """
+ Send data through PUSH socket.
+
+ :param data: The data to be sent.
+ :type event: str
+ """
+ pass
+
+ def shutdown(self):
+ self.__class__.reset()
+
+ @classmethod
+ def reset(cls):
+ with cls._instance_lock:
+ cls._instance = None
+
+
+class EventsIOLoop(ioloop.ZMQIOLoop):
"""
- Ping a client running in C{port}.
-
- :param port: the port in which the client should be listening
- :type port: int
- :param reqcbk: a callback to be called when a response from client is
- received
- :type reqcbk: function(proto.PingRequest, proto.EventResponse)
- :param timeout: the timeout for synch calls
- :type timeout: int
-
- :return: the response from client for synch calls or nothing for asynch
- calls.
- :rtype: leap.common.events.events_pb2.EventsResponse or None
+ An extension of zmq's ioloop that can wait until there are no callbacks
+ in the queue before stopping.
"""
- request = proto.PingRequest()
- service = RpcService(
- proto.EventsClientService_Stub,
- port,
- 'localhost')
- logger.debug("Pinging a client in port %d..." % port)
- return service.ping(request, callback=reqcbk, timeout=timeout)
+ def stop(self, wait=False):
+ """
+ Stop the I/O loop.
-class EventsClientService(proto.EventsClientService):
+ :param wait: Whether we should wait for callbacks in queue to finish
+ before stopping.
+ :type wait: bool
+ """
+ if wait:
+ # prevent new callbacks from being added
+ with self._callback_lock:
+ self._closing = True
+ # wait until all callbacks have been executed
+ while self._callbacks:
+ time.sleep(0.1)
+ ioloop.ZMQIOLoop.stop(self)
+
+
+class EventsClientThread(threading.Thread, EventsClient):
"""
- Service for receiving signal events in clients.
+ A threaded version of the events client.
"""
- def __init__(self):
- proto.EventsClientService.__init__(self)
+ def __init__(self, emit_addr, reg_addr):
+ """
+ Initialize the events client.
+ """
+ threading.Thread.__init__(self)
+ EventsClient.__init__(self, emit_addr, reg_addr)
+ self._lock = threading.Lock()
+ self._initialized = threading.Event()
+ self._config_prefix = os.path.join(
+ get_path_prefix(), "leap", "events")
+ self._loop = None
+ self._context = None
+ self._push = None
+ self._sub = None
+
+ def _init_zmq(self):
+ """
+ Initialize ZMQ connections.
+ """
+ self._loop = EventsIOLoop()
+ self._context = zmq.Context()
+ # connect SUB first, otherwise we might miss some event sent from this
+ # same client
+ self._sub = self._zmq_connect_sub()
+ self._push = self._zmq_connect_push()
+
+ def _zmq_connect(self, socktype, address):
+ """
+ Connect to an address using with a zmq socktype.
- def signal(self, controller, request, done):
+ :param socktype: The ZMQ socket type.
+ :type socktype: int
+ :param address: The address to connect to.
+ :type address: str
+
+ :return: A ZMQ connection stream.
+ :rtype: ZMQStream
+ """
+ logger.debug("Connecting %s to %s." % (socktype, address))
+ socket = self._context.socket(socktype)
+ # configure curve authentication
+ if zmq_has_curve():
+ public, private = maybe_create_and_get_certificates(
+ self._config_prefix, "client")
+ server_public_file = os.path.join(
+ self._config_prefix, PUBLIC_KEYS_PREFIX, "server.key")
+ server_public, _ = zmq.auth.load_certificate(server_public_file)
+ socket.curve_publickey = public
+ socket.curve_secretkey = private
+ socket.curve_serverkey = server_public
+ stream = zmqstream.ZMQStream(socket, self._loop)
+ socket.connect(address)
+ return stream
+
+ def _zmq_connect_push(self):
"""
- Receive a signal and run callbacks registered for that signal.
+ Initialize the client's PUSH connection.
- This method is called whenever a signal request is received from
- server.
+ :return: A ZMQ connection stream.
+ :rtype: ZMQStream
+ """
+ return self._zmq_connect(zmq.PUSH, self._emit_addr)
- :param controller: used to mediate a single method call
- :type controller: protobuf.socketrpc.controller.SocketRpcController
- :param request: the request received from the client
- :type request: leap.common.events.events_pb2.SignalRequest
- :param done: callback to be called when done
- :type done: protobuf.socketrpc.server.Callback
+ def _zmq_connect_sub(self):
"""
- logger.debug('Received signal from server: %s...' % str(request)[:40])
+ Initialize the client's SUB connection.
- # run registered callbacks
- # TODO: verify authentication using mac in incoming message
- if request.event in registered_callbacks:
- for (_, cbk) in registered_callbacks[request.event]:
- # callbacks should be prepared to receive a
- # events_pb2.SignalRequest.
- cbk(request)
+ :return: A ZMQ connection stream.
+ :rtype: ZMQStream
+ """
+ stream = self._zmq_connect(zmq.SUB, self._reg_addr)
+ stream.on_recv(self._on_recv)
+ return stream
- # send response back to server
- response = proto.EventResponse()
- response.status = proto.EventResponse.OK
- done.run(response)
+ def _on_recv(self, msg):
+ """
+ Handle an incoming message in the SUB socket.
- def ping(self, controller, request, done):
+ :param msg: The received message.
+ :type msg: str
"""
- Reply to a ping request.
+ ev_str, content_pickle = msg[0].split(b'\0', 1) # undo txzmq tagging
+ event = getattr(catalog, ev_str)
+ content = pickle.loads(content_pickle)
+ self._handle_event(event, content)
- :param controller: used to mediate a single method call
- :type controller: protobuf.socketrpc.controller.SocketRpcController
- :param request: the request received from the client
- :type request: leap.common.events.events_pb2.RegisterRequest
- :param done: callback to be called when done
- :type done: protobuf.socketrpc.server.Callback
+ def _subscribe(self, tag):
"""
- logger.debug("Received ping request, sending response.")
- response = proto.EventResponse()
- response.status = proto.EventResponse.OK
- done.run(response)
+ Subscribe from a tag on the zmq SUB socket.
+ :param tag: The tag to be subscribed.
+ :type tag: str
+ """
+ self._sub.socket.setsockopt(zmq.SUBSCRIBE, tag)
-class EventsClientDaemon(daemon.EventsSingletonDaemon):
- """
- A daemon that listens for incoming events from server.
- """
+ def _unsubscribe(self, tag):
+ """
+ Unsubscribe from a tag on the zmq SUB socket.
- @classmethod
- def ensure(cls, port):
+ :param tag: The tag to be unsubscribed.
+ :type tag: str
+ """
+ self._sub.socket.setsockopt(zmq.UNSUBSCRIBE, tag)
+
+ def _send(self, data):
"""
- Make sure the daemon is running on the given port.
+ Send data through PUSH socket.
- :param port: the port in which the daemon should listen
- :type port: int
+ :param data: The data to be sent.
+ :type event: str
+ """
+ # add send() as a callback for ioloop so it works between threads
+ self._loop.add_callback(lambda: self._push.send(data))
+
+ def _run_callback(self, callback, event, content):
+ """
+ Run a callback.
+
+ :param callback: The callback to be run.
+ :type callback: callable(event, *content)
+ :param event: The event to be sent.
+ :type event: Event
+ :param content: The content of the event.
+ :type content: list
+ """
+ self._loop.add_callback(lambda: callback(event, *content))
- :return: a daemon instance
- :rtype: EventsClientDaemon
+ def register(self, event, callback, uid=None, replace=False):
+ """
+ Register a callback to be executed when an event is received.
+
+ :param event: The event that triggers the callback.
+ :type event: Event
+ :param callback: The callback to be executed.
+ :type callback: callable(event, *content)
+ :param uid: The callback uid.
+ :type uid: str
+ :param replace: Wether an eventual callback with same ID should be
+ replaced.
+ :type replace: bool
+
+ :return: The callback uid.
+ :rtype: str
+
+ :raises CallbackAlreadyRegisteredError: when there's already a
+ callback identified by the given uid and replace is False.
+ """
+ self.ensure_client()
+ return EventsClient.register(
+ self, event, callback, uid=uid, replace=replace)
+
+ def unregister(self, event, uid=None):
+ """
+ Unregister callbacks for an event.
+
+ If uid is not None, then only the callback identified by the given uid
+ is removed. Otherwise, all callbacks for the event are removed.
+
+ :param event: The event that triggers the callback.
+ :type event: Event
+ :param uid: The callback uid.
+ :type uid: str
+ """
+ self.ensure_client()
+ EventsClient.unregister(self, event, uid=uid)
+
+ def emit(self, event, *content):
+ """
+ Send an event.
+
+ :param event: The event to be sent.
+ :type event: Event
+ :param content: The content of the event.
+ :type content: list
+ """
+ self.ensure_client()
+ EventsClient.emit(self, event, *content)
+
+ def run(self):
+ """
+ Run the events client.
+ """
+ logger.debug("Starting ioloop.")
+ self._init_zmq()
+ self._initialized.set()
+ self._loop.start()
+ self._loop.close()
+ logger.debug("Ioloop finished.")
+
+ def ensure_client(self):
+ """
+ Make sure the events client thread is started.
"""
- return cls.ensure_service(port, EventsClientService())
+ with self._lock:
+ if flags.EVENTS_ENABLED and not self.is_alive():
+ self.daemon = True
+ self.start()
+ self._initialized.wait()
+
+ def shutdown(self):
+ """
+ Shutdown the events client thread.
+ """
+ logger.debug("Shutting down client...")
+ with self._lock:
+ if self.is_alive():
+ self._loop.stop(wait=True)
+ EventsClient.shutdown(self)
+
+
+def shutdown():
+ """
+ Shutdown the events client thread.
+ """
+ EventsClientThread.instance().shutdown()
+
+
+def register(event, callback, uid=None, replace=False):
+ """
+ Register a callback to be executed when an event is received.
+
+ :param event: The event that triggers the callback.
+ :type event: str
+ :param callback: The callback to be executed.
+ :type callback: callable(event, content)
+ :param uid: The callback uid.
+ :type uid: str
+ :param replace: Wether an eventual callback with same ID should be
+ replaced.
+ :type replace: bool
+
+ :return: The callback uid.
+ :rtype: str
+
+ :raises CallbackAlreadyRegisteredError: when there's already a callback
+ identified by the given uid and replace is False.
+ """
+ if flags.EVENTS_ENABLED:
+ return EventsClientThread.instance().register(
+ event, callback, uid=uid, replace=replace)
+
+
+def unregister(event, uid=None):
+ """
+ Unregister callbacks for an event.
+
+ If uid is not None, then only the callback identified by the given uid is
+ removed. Otherwise, all callbacks for the event are removed.
+
+ :param event: The event that triggers the callback.
+ :type event: str
+ :param uid: The callback uid.
+ :type uid: str
+ """
+ if flags.EVENTS_ENABLED:
+ return EventsClientThread.instance().unregister(event, uid=uid)
+
+
+def emit(event, *content):
+ """
+ Send an event.
+
+ :param event: The event to be sent.
+ :type event: str
+ :param content: The content of the event.
+ :type content: list
+ """
+ if flags.EVENTS_ENABLED:
+ return EventsClientThread.instance().emit(event, *content)
+
+
+def instance():
+ """
+ Return an instance of the events client.
+
+ :return: An instance of the events client.
+ :rtype: EventsClientThread
+ """
+ return EventsClientThread.instance()
diff --git a/src/leap/common/events/daemon.py b/src/leap/common/events/daemon.py
deleted file mode 100644
index c4a4189..0000000
--- a/src/leap/common/events/daemon.py
+++ /dev/null
@@ -1,208 +0,0 @@
-# -*- coding: utf-8 -*-
-# daemon.py
-# Copyright (C) 2013 LEAP
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-A singleton daemon for running RPC services using protobuf.socketrpc.
-"""
-
-
-import logging
-import threading
-
-
-from protobuf.socketrpc.server import (
- SocketRpcServer,
- ThreadedTCPServer,
- SocketHandler,
-)
-
-
-logger = logging.getLogger(__name__)
-
-
-class ServiceAlreadyRunningException(Exception):
- """
- Raised whenever a service is already running in this process but someone
- attemped to start it in a different port.
- """
-
-
-class EventsRpcServer(SocketRpcServer):
- """
- RPC server used in server and client interfaces to receive messages.
- """
-
- def __init__(self, port, host='localhost'):
- """
- Initialize a RPC server.
-
- :param port: the port in which to listen for incoming messages
- :type port: int
- :param host: the address to bind to
- :type host: str
- """
- SocketRpcServer.__init__(self, port, host)
- self._server = None
-
- def run(self):
- """
- Run the server.
- """
- logger.info('Running server on port %d.' % self.port)
- # parent implementation does not hold the server instance, so we do it
- # here.
- self._server = ThreadedTCPServer((self.host, self.port),
- SocketHandler, self)
- # if we chose to use a random port, fetch the port number info.
- if self.port is 0:
- self.port = self._server.socket.getsockname()[1]
- self._server.serve_forever()
-
- def stop(self):
- """
- Stop the server.
- """
- self._server.shutdown()
-
-
-class EventsSingletonDaemon(threading.Thread):
- """
- Singleton class for for launching and terminating a daemon.
-
- This class is used so every part of the mechanism that needs to listen for
- messages can launch its own daemon (thread) to do the job.
- """
-
- # Singleton instance
- __instance = None
-
- def __new__(cls, *args, **kwargs):
- """
- Return a singleton instance if it exists or create and initialize one.
- """
- if len(args) is not 2:
- raise TypeError("__init__() takes exactly 2 arguments (%d given)"
- % len(args))
- if cls.__instance is None:
- cls.__instance = object.__new__(
- EventsSingletonDaemon)
- cls.__initialize(cls.__instance, args[0], args[1])
- return cls.__instance
-
- @staticmethod
- def __initialize(self, port, service):
- """
- Initialize a singleton daemon.
-
- This is a static method disguised as instance method that actually
- does the initialization of the daemon instance.
-
- :param port: the port in which to listen for incoming messages
- :type port: int
- :param service: the service to provide in this daemon
- :type service: google.protobuf.service.Service
- """
- threading.Thread.__init__(self)
- self._port = port
- self._service = service
- self._server = EventsRpcServer(self._port)
- self._server.registerService(self._service)
- self.daemon = True
-
- def __init__(self):
- """
- Singleton placeholder initialization method.
-
- Initialization is made in __new__ so we can always return the same
- instance upon object creation.
- """
- pass
-
- @classmethod
- def ensure(cls, port):
- """
- Make sure the daemon instance is running.
-
- Each implementation of this method should call `self.ensure_service`
- with the appropriate service from the `events.proto` definitions, and
- return the daemon instance.
-
- :param port: the port in which the daemon should be listening
- :type port: int
-
- :return: a daemon instance
- :rtype: EventsSingletonDaemon
- """
- raise NotImplementedError(self.ensure)
-
- @classmethod
- def ensure_service(cls, port, service):
- """
- Start the singleton instance if not already running.
-
- Might return ServiceAlreadyRunningException
-
- :param port: the port in which the daemon should be listening
- :type port: int
-
- :return: a daemon instance
- :rtype: EventsSingletonDaemon
- """
- daemon = cls(port, service)
- if not daemon.is_alive():
- daemon.start()
- elif port and port != cls.__instance._port:
- # service is running in this process but someone is trying to
- # start it in another port
- raise ServiceAlreadyRunningException(
- "Service is already running in this process on port %d."
- % self.__instance._port)
- return daemon
-
- @classmethod
- def get_instance(cls):
- """
- Retrieve singleton instance of this daemon.
-
- :return: a daemon instance
- :rtype: EventsSingletonDaemon
- """
- return cls.__instance
-
- def run(self):
- """
- Run the server.
- """
- self._server.run()
-
- def stop(self):
- """
- Stop the daemon.
- """
- self._server.stop()
-
- def get_port(self):
- """
- Retrieve the value of the port to which the service running in this
- daemon is binded to.
-
- :return: the port to which the daemon is binded to
- :rtype: int
- """
- if self._port is 0:
- self._port = self._server.port
- return self._port
diff --git a/src/leap/common/events/mac_auth.py b/src/leap/common/events/errors.py
index 49d48f7..58e0014 100644
--- a/src/leap/common/events/mac_auth.py
+++ b/src/leap/common/events/errors.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
-# mac_auth.py
-# Copyright (C) 2013 LEAP
+# errors.py
+# Copyright (C) 2015 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -15,17 +15,9 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-Authentication system for events.
-This is not implemented yet.
-"""
-
-
-class MacMethod(object):
+class CallbackAlreadyRegisteredError(Exception):
"""
- Representation of possible MAC authentication methods.
+ Raised when trying to register an already registered callback.
"""
-
- MAC_NONE = 'none'
- MAC_HMAC = 'hmac'
+ pass
diff --git a/src/leap/common/events/events.proto b/src/leap/common/events/events.proto
deleted file mode 100644
index 2371b2a..0000000
--- a/src/leap/common/events/events.proto
+++ /dev/null
@@ -1,145 +0,0 @@
-// signal.proto
-// Copyright (C) 2013 LEAP
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-//
-// You should have received a copy of the GNU General Public License
-// along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-package leap.common.events;
-option py_generic_services = true;
-
-
-// These are the events that can be signaled using the events mechanism.
-
-enum Event {
- CLIENT_SESSION_ID = 1;
- CLIENT_UID = 2;
- SOLEDAD_CREATING_KEYS = 3;
- SOLEDAD_DONE_CREATING_KEYS = 4;
- SOLEDAD_UPLOADING_KEYS = 5;
- SOLEDAD_DONE_UPLOADING_KEYS = 6;
- SOLEDAD_DOWNLOADING_KEYS = 7;
- SOLEDAD_DONE_DOWNLOADING_KEYS = 8;
- SOLEDAD_NEW_DATA_TO_SYNC = 9;
- SOLEDAD_DONE_DATA_SYNC = 10;
- UPDATER_NEW_UPDATES = 11;
- UPDATER_DONE_UPDATING = 12;
- RAISE_WINDOW = 13;
- SMTP_SERVICE_STARTED = 14;
- SMTP_SERVICE_FAILED_TO_START = 15;
- SMTP_RECIPIENT_ACCEPTED_ENCRYPTED = 16;
- SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED = 17;
- SMTP_RECIPIENT_REJECTED = 18;
- SMTP_START_ENCRYPT_AND_SIGN = 19;
- SMTP_END_ENCRYPT_AND_SIGN = 20;
- SMTP_START_SIGN = 21;
- SMTP_END_SIGN = 22;
- SMTP_SEND_MESSAGE_START = 23;
- SMTP_SEND_MESSAGE_SUCCESS = 24;
- SMTP_SEND_MESSAGE_ERROR = 25;
- SMTP_CONNECTION_LOST = 26;
- IMAP_SERVICE_STARTED = 30;
- IMAP_SERVICE_FAILED_TO_START = 31;
- IMAP_CLIENT_LOGIN = 32;
- IMAP_FETCHED_INCOMING = 33;
- IMAP_MSG_PROCESSING = 34;
- IMAP_MSG_DECRYPTED = 35;
- IMAP_MSG_SAVED_LOCALLY = 36;
- IMAP_MSG_DELETED_INCOMING = 37;
- IMAP_UNHANDLED_ERROR = 38;
- IMAP_UNREAD_MAIL = 39;
- KEYMANAGER_LOOKING_FOR_KEY = 40;
- KEYMANAGER_KEY_FOUND = 41;
- KEYMANAGER_KEY_NOT_FOUND = 42;
- KEYMANAGER_STARTED_KEY_GENERATION = 43;
- KEYMANAGER_FINISHED_KEY_GENERATION = 44;
- KEYMANAGER_DONE_UPLOADING_KEYS = 45;
- SOLEDAD_INVALID_AUTH_TOKEN = 46;
- SOLEDAD_SYNC_SEND_STATUS = 47;
- SOLEDAD_SYNC_RECEIVE_STATUS = 48;
-}
-
-
-// A SignalRequest is the type of the message sent from one component to request
-// that a signal be sent to every registered component.
-
-message SignalRequest {
- required Event event = 1;
- required string content = 2;
- required string mac_method = 3;
- required bytes mac = 4;
- optional string enc_method = 5;
- optional bool error_occurred = 6;
-}
-
-
-// A RegisterRequest message tells the server that a component wants to
-// be signaled whenever a specific event occurs.
-
-message RegisterRequest {
- required Event event = 1;
- required int32 port = 2;
- required string mac_method = 3;
- required bytes mac = 4;
-}
-
-
-// An UnregisterRequest message tells the server that a component does not
-// want to be signaled when a specific event occurs.
-
-message UnregisterRequest {
- required Event event = 1;
- required int32 port = 2;
- required string mac_method = 3;
- required bytes mac = 4;
-}
-
-
-// A PingRequest message is used to find out if a server or component is
-// alive.
-
-message PingRequest {
-}
-
-
-// The EventResponse is the message sent back by server and components after
-// they receive other kinds of requests.
-
-message EventResponse {
-
- enum Status {
- OK = 1;
- UNAUTH = 2;
- ERROR = 3;
- }
-
- required Status status = 1;
- optional string result = 2;
-}
-
-
-// The EventsServerService is the service provided by the server.
-
-service EventsServerService {
- rpc ping(PingRequest) returns (EventResponse);
- rpc register(RegisterRequest) returns (EventResponse);
- rpc unregister(UnregisterRequest) returns (EventResponse);
- rpc signal(SignalRequest) returns (EventResponse);
-}
-
-
-// EventsComponentService is the service provided by components (clients).
-
-service EventsClientService {
- rpc ping(PingRequest) returns (EventResponse);
- rpc signal(SignalRequest) returns (EventResponse);
-}
diff --git a/src/leap/common/events/events_pb2.py b/src/leap/common/events/events_pb2.py
deleted file mode 100644
index 2bf568f..0000000
--- a/src/leap/common/events/events_pb2.py
+++ /dev/null
@@ -1,635 +0,0 @@
-# Generated by the protocol buffer compiler. DO NOT EDIT!
-
-from google.protobuf import descriptor
-from google.protobuf import message
-from google.protobuf import reflection
-from google.protobuf import service
-from google.protobuf import service_reflection
-from google.protobuf import descriptor_pb2
-# @@protoc_insertion_point(imports)
-
-
-
-DESCRIPTOR = descriptor.FileDescriptor(
- name='events.proto',
- package='leap.common.events',
- serialized_pb='\n\x0c\x65vents.proto\x12\x12leap.common.events\"\x97\x01\n\rSignalRequest\x12(\n\x05\x65vent\x18\x01 \x02(\x0e\x32\x19.leap.common.events.Event\x12\x0f\n\x07\x63ontent\x18\x02 \x02(\t\x12\x12\n\nmac_method\x18\x03 \x02(\t\x12\x0b\n\x03mac\x18\x04 \x02(\x0c\x12\x12\n\nenc_method\x18\x05 \x01(\t\x12\x16\n\x0e\x65rror_occurred\x18\x06 \x01(\x08\"j\n\x0fRegisterRequest\x12(\n\x05\x65vent\x18\x01 \x02(\x0e\x32\x19.leap.common.events.Event\x12\x0c\n\x04port\x18\x02 \x02(\x05\x12\x12\n\nmac_method\x18\x03 \x02(\t\x12\x0b\n\x03mac\x18\x04 \x02(\x0c\"l\n\x11UnregisterRequest\x12(\n\x05\x65vent\x18\x01 \x02(\x0e\x32\x19.leap.common.events.Event\x12\x0c\n\x04port\x18\x02 \x02(\x05\x12\x12\n\nmac_method\x18\x03 \x02(\t\x12\x0b\n\x03mac\x18\x04 \x02(\x0c\"\r\n\x0bPingRequest\"\x82\x01\n\rEventResponse\x12\x38\n\x06status\x18\x01 \x02(\x0e\x32(.leap.common.events.EventResponse.Status\x12\x0e\n\x06result\x18\x02 \x01(\t\"\'\n\x06Status\x12\x06\n\x02OK\x10\x01\x12\n\n\x06UNAUTH\x10\x02\x12\t\n\x05\x45RROR\x10\x03*\x9f\n\n\x05\x45vent\x12\x15\n\x11\x43LIENT_SESSION_ID\x10\x01\x12\x0e\n\nCLIENT_UID\x10\x02\x12\x19\n\x15SOLEDAD_CREATING_KEYS\x10\x03\x12\x1e\n\x1aSOLEDAD_DONE_CREATING_KEYS\x10\x04\x12\x1a\n\x16SOLEDAD_UPLOADING_KEYS\x10\x05\x12\x1f\n\x1bSOLEDAD_DONE_UPLOADING_KEYS\x10\x06\x12\x1c\n\x18SOLEDAD_DOWNLOADING_KEYS\x10\x07\x12!\n\x1dSOLEDAD_DONE_DOWNLOADING_KEYS\x10\x08\x12\x1c\n\x18SOLEDAD_NEW_DATA_TO_SYNC\x10\t\x12\x1a\n\x16SOLEDAD_DONE_DATA_SYNC\x10\n\x12\x17\n\x13UPDATER_NEW_UPDATES\x10\x0b\x12\x19\n\x15UPDATER_DONE_UPDATING\x10\x0c\x12\x10\n\x0cRAISE_WINDOW\x10\r\x12\x18\n\x14SMTP_SERVICE_STARTED\x10\x0e\x12 \n\x1cSMTP_SERVICE_FAILED_TO_START\x10\x0f\x12%\n!SMTP_RECIPIENT_ACCEPTED_ENCRYPTED\x10\x10\x12\'\n#SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED\x10\x11\x12\x1b\n\x17SMTP_RECIPIENT_REJECTED\x10\x12\x12\x1f\n\x1bSMTP_START_ENCRYPT_AND_SIGN\x10\x13\x12\x1d\n\x19SMTP_END_ENCRYPT_AND_SIGN\x10\x14\x12\x13\n\x0fSMTP_START_SIGN\x10\x15\x12\x11\n\rSMTP_END_SIGN\x10\x16\x12\x1b\n\x17SMTP_SEND_MESSAGE_START\x10\x17\x12\x1d\n\x19SMTP_SEND_MESSAGE_SUCCESS\x10\x18\x12\x1b\n\x17SMTP_SEND_MESSAGE_ERROR\x10\x19\x12\x18\n\x14SMTP_CONNECTION_LOST\x10\x1a\x12\x18\n\x14IMAP_SERVICE_STARTED\x10\x1e\x12 \n\x1cIMAP_SERVICE_FAILED_TO_START\x10\x1f\x12\x15\n\x11IMAP_CLIENT_LOGIN\x10 \x12\x19\n\x15IMAP_FETCHED_INCOMING\x10!\x12\x17\n\x13IMAP_MSG_PROCESSING\x10\"\x12\x16\n\x12IMAP_MSG_DECRYPTED\x10#\x12\x1a\n\x16IMAP_MSG_SAVED_LOCALLY\x10$\x12\x1d\n\x19IMAP_MSG_DELETED_INCOMING\x10%\x12\x18\n\x14IMAP_UNHANDLED_ERROR\x10&\x12\x14\n\x10IMAP_UNREAD_MAIL\x10\'\x12\x1e\n\x1aKEYMANAGER_LOOKING_FOR_KEY\x10(\x12\x18\n\x14KEYMANAGER_KEY_FOUND\x10)\x12\x1c\n\x18KEYMANAGER_KEY_NOT_FOUND\x10*\x12%\n!KEYMANAGER_STARTED_KEY_GENERATION\x10+\x12&\n\"KEYMANAGER_FINISHED_KEY_GENERATION\x10,\x12\"\n\x1eKEYMANAGER_DONE_UPLOADING_KEYS\x10-\x12\x1e\n\x1aSOLEDAD_INVALID_AUTH_TOKEN\x10.\x12\x1c\n\x18SOLEDAD_SYNC_SEND_STATUS\x10/\x12\x1f\n\x1bSOLEDAD_SYNC_RECEIVE_STATUS\x10\x30\x32\xdd\x02\n\x13\x45ventsServerService\x12J\n\x04ping\x12\x1f.leap.common.events.PingRequest\x1a!.leap.common.events.EventResponse\x12R\n\x08register\x12#.leap.common.events.RegisterRequest\x1a!.leap.common.events.EventResponse\x12V\n\nunregister\x12%.leap.common.events.UnregisterRequest\x1a!.leap.common.events.EventResponse\x12N\n\x06signal\x12!.leap.common.events.SignalRequest\x1a!.leap.common.events.EventResponse2\xb1\x01\n\x13\x45ventsClientService\x12J\n\x04ping\x12\x1f.leap.common.events.PingRequest\x1a!.leap.common.events.EventResponse\x12N\n\x06signal\x12!.leap.common.events.SignalRequest\x1a!.leap.common.events.EventResponseB\x03\x90\x01\x01')
-
-_EVENT = descriptor.EnumDescriptor(
- name='Event',
- full_name='leap.common.events.Event',
- filename=None,
- file=DESCRIPTOR,
- values=[
- descriptor.EnumValueDescriptor(
- name='CLIENT_SESSION_ID', index=0, number=1,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='CLIENT_UID', index=1, number=2,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SOLEDAD_CREATING_KEYS', index=2, number=3,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SOLEDAD_DONE_CREATING_KEYS', index=3, number=4,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SOLEDAD_UPLOADING_KEYS', index=4, number=5,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SOLEDAD_DONE_UPLOADING_KEYS', index=5, number=6,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SOLEDAD_DOWNLOADING_KEYS', index=6, number=7,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SOLEDAD_DONE_DOWNLOADING_KEYS', index=7, number=8,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SOLEDAD_NEW_DATA_TO_SYNC', index=8, number=9,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SOLEDAD_DONE_DATA_SYNC', index=9, number=10,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='UPDATER_NEW_UPDATES', index=10, number=11,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='UPDATER_DONE_UPDATING', index=11, number=12,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='RAISE_WINDOW', index=12, number=13,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SMTP_SERVICE_STARTED', index=13, number=14,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SMTP_SERVICE_FAILED_TO_START', index=14, number=15,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SMTP_RECIPIENT_ACCEPTED_ENCRYPTED', index=15, number=16,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED', index=16, number=17,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SMTP_RECIPIENT_REJECTED', index=17, number=18,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SMTP_START_ENCRYPT_AND_SIGN', index=18, number=19,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SMTP_END_ENCRYPT_AND_SIGN', index=19, number=20,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SMTP_START_SIGN', index=20, number=21,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SMTP_END_SIGN', index=21, number=22,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SMTP_SEND_MESSAGE_START', index=22, number=23,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SMTP_SEND_MESSAGE_SUCCESS', index=23, number=24,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SMTP_SEND_MESSAGE_ERROR', index=24, number=25,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SMTP_CONNECTION_LOST', index=25, number=26,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='IMAP_SERVICE_STARTED', index=26, number=30,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='IMAP_SERVICE_FAILED_TO_START', index=27, number=31,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='IMAP_CLIENT_LOGIN', index=28, number=32,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='IMAP_FETCHED_INCOMING', index=29, number=33,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='IMAP_MSG_PROCESSING', index=30, number=34,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='IMAP_MSG_DECRYPTED', index=31, number=35,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='IMAP_MSG_SAVED_LOCALLY', index=32, number=36,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='IMAP_MSG_DELETED_INCOMING', index=33, number=37,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='IMAP_UNHANDLED_ERROR', index=34, number=38,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='IMAP_UNREAD_MAIL', index=35, number=39,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='KEYMANAGER_LOOKING_FOR_KEY', index=36, number=40,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='KEYMANAGER_KEY_FOUND', index=37, number=41,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='KEYMANAGER_KEY_NOT_FOUND', index=38, number=42,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='KEYMANAGER_STARTED_KEY_GENERATION', index=39, number=43,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='KEYMANAGER_FINISHED_KEY_GENERATION', index=40, number=44,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='KEYMANAGER_DONE_UPLOADING_KEYS', index=41, number=45,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SOLEDAD_INVALID_AUTH_TOKEN', index=42, number=46,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SOLEDAD_SYNC_SEND_STATUS', index=43, number=47,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='SOLEDAD_SYNC_RECEIVE_STATUS', index=44, number=48,
- options=None,
- type=None),
- ],
- containing_type=None,
- options=None,
- serialized_start=557,
- serialized_end=1868,
-)
-
-
-CLIENT_SESSION_ID = 1
-CLIENT_UID = 2
-SOLEDAD_CREATING_KEYS = 3
-SOLEDAD_DONE_CREATING_KEYS = 4
-SOLEDAD_UPLOADING_KEYS = 5
-SOLEDAD_DONE_UPLOADING_KEYS = 6
-SOLEDAD_DOWNLOADING_KEYS = 7
-SOLEDAD_DONE_DOWNLOADING_KEYS = 8
-SOLEDAD_NEW_DATA_TO_SYNC = 9
-SOLEDAD_DONE_DATA_SYNC = 10
-UPDATER_NEW_UPDATES = 11
-UPDATER_DONE_UPDATING = 12
-RAISE_WINDOW = 13
-SMTP_SERVICE_STARTED = 14
-SMTP_SERVICE_FAILED_TO_START = 15
-SMTP_RECIPIENT_ACCEPTED_ENCRYPTED = 16
-SMTP_RECIPIENT_ACCEPTED_UNENCRYPTED = 17
-SMTP_RECIPIENT_REJECTED = 18
-SMTP_START_ENCRYPT_AND_SIGN = 19
-SMTP_END_ENCRYPT_AND_SIGN = 20
-SMTP_START_SIGN = 21
-SMTP_END_SIGN = 22
-SMTP_SEND_MESSAGE_START = 23
-SMTP_SEND_MESSAGE_SUCCESS = 24
-SMTP_SEND_MESSAGE_ERROR = 25
-SMTP_CONNECTION_LOST = 26
-IMAP_SERVICE_STARTED = 30
-IMAP_SERVICE_FAILED_TO_START = 31
-IMAP_CLIENT_LOGIN = 32
-IMAP_FETCHED_INCOMING = 33
-IMAP_MSG_PROCESSING = 34
-IMAP_MSG_DECRYPTED = 35
-IMAP_MSG_SAVED_LOCALLY = 36
-IMAP_MSG_DELETED_INCOMING = 37
-IMAP_UNHANDLED_ERROR = 38
-IMAP_UNREAD_MAIL = 39
-KEYMANAGER_LOOKING_FOR_KEY = 40
-KEYMANAGER_KEY_FOUND = 41
-KEYMANAGER_KEY_NOT_FOUND = 42
-KEYMANAGER_STARTED_KEY_GENERATION = 43
-KEYMANAGER_FINISHED_KEY_GENERATION = 44
-KEYMANAGER_DONE_UPLOADING_KEYS = 45
-SOLEDAD_INVALID_AUTH_TOKEN = 46
-SOLEDAD_SYNC_SEND_STATUS = 47
-SOLEDAD_SYNC_RECEIVE_STATUS = 48
-
-
-_EVENTRESPONSE_STATUS = descriptor.EnumDescriptor(
- name='Status',
- full_name='leap.common.events.EventResponse.Status',
- filename=None,
- file=DESCRIPTOR,
- values=[
- descriptor.EnumValueDescriptor(
- name='OK', index=0, number=1,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='UNAUTH', index=1, number=2,
- options=None,
- type=None),
- descriptor.EnumValueDescriptor(
- name='ERROR', index=2, number=3,
- options=None,
- type=None),
- ],
- containing_type=None,
- options=None,
- serialized_start=515,
- serialized_end=554,
-)
-
-
-_SIGNALREQUEST = descriptor.Descriptor(
- name='SignalRequest',
- full_name='leap.common.events.SignalRequest',
- filename=None,
- file=DESCRIPTOR,
- containing_type=None,
- fields=[
- descriptor.FieldDescriptor(
- name='event', full_name='leap.common.events.SignalRequest.event', index=0,
- number=1, type=14, cpp_type=8, label=2,
- has_default_value=False, default_value=1,
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- descriptor.FieldDescriptor(
- name='content', full_name='leap.common.events.SignalRequest.content', index=1,
- number=2, type=9, cpp_type=9, label=2,
- has_default_value=False, default_value=unicode("", "utf-8"),
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- descriptor.FieldDescriptor(
- name='mac_method', full_name='leap.common.events.SignalRequest.mac_method', index=2,
- number=3, type=9, cpp_type=9, label=2,
- has_default_value=False, default_value=unicode("", "utf-8"),
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- descriptor.FieldDescriptor(
- name='mac', full_name='leap.common.events.SignalRequest.mac', index=3,
- number=4, type=12, cpp_type=9, label=2,
- has_default_value=False, default_value="",
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- descriptor.FieldDescriptor(
- name='enc_method', full_name='leap.common.events.SignalRequest.enc_method', index=4,
- number=5, type=9, cpp_type=9, label=1,
- has_default_value=False, default_value=unicode("", "utf-8"),
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- descriptor.FieldDescriptor(
- name='error_occurred', full_name='leap.common.events.SignalRequest.error_occurred', index=5,
- number=6, type=8, cpp_type=7, label=1,
- has_default_value=False, default_value=False,
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- ],
- extensions=[
- ],
- nested_types=[],
- enum_types=[
- ],
- options=None,
- is_extendable=False,
- extension_ranges=[],
- serialized_start=37,
- serialized_end=188,
-)
-
-
-_REGISTERREQUEST = descriptor.Descriptor(
- name='RegisterRequest',
- full_name='leap.common.events.RegisterRequest',
- filename=None,
- file=DESCRIPTOR,
- containing_type=None,
- fields=[
- descriptor.FieldDescriptor(
- name='event', full_name='leap.common.events.RegisterRequest.event', index=0,
- number=1, type=14, cpp_type=8, label=2,
- has_default_value=False, default_value=1,
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- descriptor.FieldDescriptor(
- name='port', full_name='leap.common.events.RegisterRequest.port', index=1,
- number=2, type=5, cpp_type=1, label=2,
- has_default_value=False, default_value=0,
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- descriptor.FieldDescriptor(
- name='mac_method', full_name='leap.common.events.RegisterRequest.mac_method', index=2,
- number=3, type=9, cpp_type=9, label=2,
- has_default_value=False, default_value=unicode("", "utf-8"),
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- descriptor.FieldDescriptor(
- name='mac', full_name='leap.common.events.RegisterRequest.mac', index=3,
- number=4, type=12, cpp_type=9, label=2,
- has_default_value=False, default_value="",
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- ],
- extensions=[
- ],
- nested_types=[],
- enum_types=[
- ],
- options=None,
- is_extendable=False,
- extension_ranges=[],
- serialized_start=190,
- serialized_end=296,
-)
-
-
-_UNREGISTERREQUEST = descriptor.Descriptor(
- name='UnregisterRequest',
- full_name='leap.common.events.UnregisterRequest',
- filename=None,
- file=DESCRIPTOR,
- containing_type=None,
- fields=[
- descriptor.FieldDescriptor(
- name='event', full_name='leap.common.events.UnregisterRequest.event', index=0,
- number=1, type=14, cpp_type=8, label=2,
- has_default_value=False, default_value=1,
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- descriptor.FieldDescriptor(
- name='port', full_name='leap.common.events.UnregisterRequest.port', index=1,
- number=2, type=5, cpp_type=1, label=2,
- has_default_value=False, default_value=0,
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- descriptor.FieldDescriptor(
- name='mac_method', full_name='leap.common.events.UnregisterRequest.mac_method', index=2,
- number=3, type=9, cpp_type=9, label=2,
- has_default_value=False, default_value=unicode("", "utf-8"),
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- descriptor.FieldDescriptor(
- name='mac', full_name='leap.common.events.UnregisterRequest.mac', index=3,
- number=4, type=12, cpp_type=9, label=2,
- has_default_value=False, default_value="",
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- ],
- extensions=[
- ],
- nested_types=[],
- enum_types=[
- ],
- options=None,
- is_extendable=False,
- extension_ranges=[],
- serialized_start=298,
- serialized_end=406,
-)
-
-
-_PINGREQUEST = descriptor.Descriptor(
- name='PingRequest',
- full_name='leap.common.events.PingRequest',
- filename=None,
- file=DESCRIPTOR,
- containing_type=None,
- fields=[
- ],
- extensions=[
- ],
- nested_types=[],
- enum_types=[
- ],
- options=None,
- is_extendable=False,
- extension_ranges=[],
- serialized_start=408,
- serialized_end=421,
-)
-
-
-_EVENTRESPONSE = descriptor.Descriptor(
- name='EventResponse',
- full_name='leap.common.events.EventResponse',
- filename=None,
- file=DESCRIPTOR,
- containing_type=None,
- fields=[
- descriptor.FieldDescriptor(
- name='status', full_name='leap.common.events.EventResponse.status', index=0,
- number=1, type=14, cpp_type=8, label=2,
- has_default_value=False, default_value=1,
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- descriptor.FieldDescriptor(
- name='result', full_name='leap.common.events.EventResponse.result', index=1,
- number=2, type=9, cpp_type=9, label=1,
- has_default_value=False, default_value=unicode("", "utf-8"),
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None),
- ],
- extensions=[
- ],
- nested_types=[],
- enum_types=[
- _EVENTRESPONSE_STATUS,
- ],
- options=None,
- is_extendable=False,
- extension_ranges=[],
- serialized_start=424,
- serialized_end=554,
-)
-
-_SIGNALREQUEST.fields_by_name['event'].enum_type = _EVENT
-_REGISTERREQUEST.fields_by_name['event'].enum_type = _EVENT
-_UNREGISTERREQUEST.fields_by_name['event'].enum_type = _EVENT
-_EVENTRESPONSE.fields_by_name['status'].enum_type = _EVENTRESPONSE_STATUS
-_EVENTRESPONSE_STATUS.containing_type = _EVENTRESPONSE;
-DESCRIPTOR.message_types_by_name['SignalRequest'] = _SIGNALREQUEST
-DESCRIPTOR.message_types_by_name['RegisterRequest'] = _REGISTERREQUEST
-DESCRIPTOR.message_types_by_name['UnregisterRequest'] = _UNREGISTERREQUEST
-DESCRIPTOR.message_types_by_name['PingRequest'] = _PINGREQUEST
-DESCRIPTOR.message_types_by_name['EventResponse'] = _EVENTRESPONSE
-
-class SignalRequest(message.Message):
- __metaclass__ = reflection.GeneratedProtocolMessageType
- DESCRIPTOR = _SIGNALREQUEST
-
- # @@protoc_insertion_point(class_scope:leap.common.events.SignalRequest)
-
-class RegisterRequest(message.Message):
- __metaclass__ = reflection.GeneratedProtocolMessageType
- DESCRIPTOR = _REGISTERREQUEST
-
- # @@protoc_insertion_point(class_scope:leap.common.events.RegisterRequest)
-
-class UnregisterRequest(message.Message):
- __metaclass__ = reflection.GeneratedProtocolMessageType
- DESCRIPTOR = _UNREGISTERREQUEST
-
- # @@protoc_insertion_point(class_scope:leap.common.events.UnregisterRequest)
-
-class PingRequest(message.Message):
- __metaclass__ = reflection.GeneratedProtocolMessageType
- DESCRIPTOR = _PINGREQUEST
-
- # @@protoc_insertion_point(class_scope:leap.common.events.PingRequest)
-
-class EventResponse(message.Message):
- __metaclass__ = reflection.GeneratedProtocolMessageType
- DESCRIPTOR = _EVENTRESPONSE
-
- # @@protoc_insertion_point(class_scope:leap.common.events.EventResponse)
-
-
-_EVENTSSERVERSERVICE = descriptor.ServiceDescriptor(
- name='EventsServerService',
- full_name='leap.common.events.EventsServerService',
- file=DESCRIPTOR,
- index=0,
- options=None,
- serialized_start=1871,
- serialized_end=2220,
- methods=[
- descriptor.MethodDescriptor(
- name='ping',
- full_name='leap.common.events.EventsServerService.ping',
- index=0,
- containing_service=None,
- input_type=_PINGREQUEST,
- output_type=_EVENTRESPONSE,
- options=None,
- ),
- descriptor.MethodDescriptor(
- name='register',
- full_name='leap.common.events.EventsServerService.register',
- index=1,
- containing_service=None,
- input_type=_REGISTERREQUEST,
- output_type=_EVENTRESPONSE,
- options=None,
- ),
- descriptor.MethodDescriptor(
- name='unregister',
- full_name='leap.common.events.EventsServerService.unregister',
- index=2,
- containing_service=None,
- input_type=_UNREGISTERREQUEST,
- output_type=_EVENTRESPONSE,
- options=None,
- ),
- descriptor.MethodDescriptor(
- name='signal',
- full_name='leap.common.events.EventsServerService.signal',
- index=3,
- containing_service=None,
- input_type=_SIGNALREQUEST,
- output_type=_EVENTRESPONSE,
- options=None,
- ),
-])
-
-class EventsServerService(service.Service):
- __metaclass__ = service_reflection.GeneratedServiceType
- DESCRIPTOR = _EVENTSSERVERSERVICE
-class EventsServerService_Stub(EventsServerService):
- __metaclass__ = service_reflection.GeneratedServiceStubType
- DESCRIPTOR = _EVENTSSERVERSERVICE
-
-
-_EVENTSCLIENTSERVICE = descriptor.ServiceDescriptor(
- name='EventsClientService',
- full_name='leap.common.events.EventsClientService',
- file=DESCRIPTOR,
- index=1,
- options=None,
- serialized_start=2223,
- serialized_end=2400,
- methods=[
- descriptor.MethodDescriptor(
- name='ping',
- full_name='leap.common.events.EventsClientService.ping',
- index=0,
- containing_service=None,
- input_type=_PINGREQUEST,
- output_type=_EVENTRESPONSE,
- options=None,
- ),
- descriptor.MethodDescriptor(
- name='signal',
- full_name='leap.common.events.EventsClientService.signal',
- index=1,
- containing_service=None,
- input_type=_SIGNALREQUEST,
- output_type=_EVENTRESPONSE,
- options=None,
- ),
-])
-
-class EventsClientService(service.Service):
- __metaclass__ = service_reflection.GeneratedServiceType
- DESCRIPTOR = _EVENTSCLIENTSERVICE
-class EventsClientService_Stub(EventsClientService):
- __metaclass__ = service_reflection.GeneratedServiceStubType
- DESCRIPTOR = _EVENTSCLIENTSERVICE
-
-# @@protoc_insertion_point(module_scope)
diff --git a/src/leap/common/events/Makefile b/src/leap/common/events/flags.py
index 5b7e60d..137f663 100644
--- a/src/leap/common/events/Makefile
+++ b/src/leap/common/events/flags.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
-# Makefile
-# Copyright (C) 2013 LEAP
+# __init__.py
+# Copyright (C) 2015 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -14,18 +14,15 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Flags for the events framework.
+"""
+from leap.common.check import leap_assert
-# This file is used to generate protobuf python files that are used for IPC:
-#
-# https://developers.google.com/protocol-buffers/docs/pythontutorial
-
-PROTOC = protoc
-
-all: events_pb2.py
+EVENTS_ENABLED = True
-%_pb2.py: %.proto
- $(PROTOC) --python_out=./ $<
-# autopep8 --in-place --aggressive $@
-clean:
- rm -f *_pb2.py
+def set_events_enabled(flag):
+ leap_assert(isinstance(flag, bool))
+ global EVENTS_ENABLED
+ EVENTS_ENABLED = flag
diff --git a/src/leap/common/events/server.py b/src/leap/common/events/server.py
index 41aede3..a69202e 100644
--- a/src/leap/common/events/server.py
+++ b/src/leap/common/events/server.py
@@ -14,223 +14,79 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""
-A server for the events mechanism.
-
-A server can receive different kinds of requests from clients:
- 1. Registration request: store client port number to be notified when
- a specific signal arrives.
- 2. Signal request: redistribute the signal to registered clients.
"""
-import logging
-import socket
+The server for the events mechanism.
+"""
-from protobuf.socketrpc import RpcService
-from leap.common.events import (
- events_pb2 as proto,
- daemon,
-)
+import logging
+import txzmq
+from leap.common.zmq_utils import zmq_has_curve
-logger = logging.getLogger(__name__)
+from leap.common.events.zmq_components import TxZmqServerComponent
-SERVER_PORT = 8090
+if zmq_has_curve():
+ EMIT_ADDR = "tcp://127.0.0.1:9000"
+ REG_ADDR = "tcp://127.0.0.1:9001"
+else:
+ EMIT_ADDR = "ipc:///tmp/leap.common.events.socket.0"
+ REG_ADDR = "ipc:///tmp/leap.common.events.socket.1"
-# the `registered_clients` dictionary below should have the following
-# format:
-#
-# { event_signal: [ port, ... ], ... }
-#
-registered_clients = {}
-
-class PortAlreadyTaken(Exception):
- """
- Raised when trying to open a server in a port that is already taken.
- """
- pass
+logger = logging.getLogger(__name__)
-def ensure_server(port=SERVER_PORT):
+def ensure_server(emit_addr=EMIT_ADDR, reg_addr=REG_ADDR):
"""
- Make sure the server is running on the given port.
+ Make sure the server is running in the given addresses.
- Attempt to connect to given local port. Upon success, assume that the
- events server has already been started. Upon failure, start events server.
+ :param emit_addr: The address in which to receive events from clients.
+ :type emit_addr: str
+ :param reg_addr: The address to which publish events to clients.
+ :type reg_addr: str
- :param port: the port in which server should be listening
- :type port: int
-
- :return: the daemon instance or nothing
- :rtype: EventsServerDaemon or None
-
- :raise PortAlreadyTaken: Raised if C{port} is already taken by something
- that is not an events server.
- """
- try:
- # check if port is available
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.connect(('localhost', port))
- s.close()
- # port is taken, check if there's a server running there
- response = ping(port=port, timeout=1000)
- if response is not None and response.status == proto.EventResponse.OK:
- logger.info('A server is already running on port %d.', port)
- return
- # port is taken, and not by an events server
- logger.warning(
- 'Port %d is taken by something not an events server.', port)
- raise PortAlreadyTaken(port)
- except socket.error:
- # port is available, run a server
- logger.info('Launching server on port %d.', port)
- return EventsServerDaemon.ensure(port)
-
-
-def ping(port=SERVER_PORT, reqcbk=None, timeout=1000):
+ :return: an events server instance
+ :rtype: EventsServer
"""
- Ping the server.
-
- :param port: the port in which server should be listening
- :type port: int
- :param reqcbk: a callback to be called when a response from server is
- received
- :type reqcbk: function(proto.PingRequest, proto.EventResponse)
- :param timeout: the timeout for synch calls
- :type timeout: int
-
- :return: the response from server for synch calls or nothing for asynch
- calls.
- :rtype: leap.common.events.events_pb2.EventsResponse or None
- """
- request = proto.PingRequest()
- service = RpcService(
- proto.EventsServerService_Stub,
- port,
- 'localhost')
- logger.debug("Pinging server in port %d..." % port)
- return service.ping(request, callback=reqcbk, timeout=timeout)
+ _server = EventsServer(emit_addr, reg_addr)
+ return _server
-class EventsServerService(proto.EventsServerService):
+class EventsServer(TxZmqServerComponent):
"""
- Service for receiving events in clients.
+ An events server that listens for events in one address and publishes those
+ events in another address.
"""
- def register(self, controller, request, done):
- """
- Register a client port to be signaled when specific events come in.
-
- :param controller: used to mediate a single method call
- :type controller: protobuf.socketrpc.controller.SocketRpcController
- :param request: the request received from the client
- :type request: leap.common.events.events_pb2.RegisterRequest
- :param done: callback to be called when done
- :type done: protobuf.socketrpc.server.Callback
- """
- logger.info("Received registration request: %s..." % str(request)[:40])
- # add client port to signal list
- if request.event not in registered_clients:
- registered_clients[request.event] = set([])
- registered_clients[request.event].add(request.port)
- # send response back to client
-
- logger.debug('sending response back')
- response = proto.EventResponse()
- response.status = proto.EventResponse.OK
- done.run(response)
-
- def unregister(self, controller, request, done):
- """
- Unregister a client port so it will not be signaled when specific
- events come in.
-
- :param controller: used to mediate a single method call
- :type controller: protobuf.socketrpc.controller.SocketRpcController
- :param request: the request received from the client
- :type request: leap.common.events.events_pb2.RegisterRequest
- :param done: callback to be called when done
- :type done: protobuf.socketrpc.server.Callback
- """
- logger.info(
- "Received unregistration request: %s..." % str(request)[:40])
- # remove client port from signal list
- response = proto.EventResponse()
- if request.event in registered_clients:
- try:
- registered_clients[request.event].remove(request.port)
- response.status = proto.EventResponse.OK
- except KeyError:
- response.status = proto.EventsResponse.ERROR
- response.result = 'Port %d not registered.' % request.port
- # send response back to client
- logger.debug('sending response back')
- done.run(response)
-
- def signal(self, controller, request, done):
- """
- Perform an RPC call to signal all clients registered to receive a
- specific signal.
-
- :param controller: used to mediate a single method call
- :type controller: protobuf.socketrpc.controller.SocketRpcController
- :param request: the request received from the client
- :type request: leap.common.events.events_pb2.SignalRequest
- :param done: callback to be called when done
- :type done: protobuf.socketrpc.server.Callback
- """
- logger.debug('Received signal from client: %s...', str(request)[:40])
- # send signal to all registered clients
- # TODO: verify signal auth
- if request.event in registered_clients:
- for port in registered_clients[request.event]:
-
- def callback(req, resp):
- logger.debug("Signal received by " + str(port))
-
- service = RpcService(proto.EventsClientService_Stub,
- port, 'localhost')
- service.signal(request, callback=callback)
- # send response back to client
- response = proto.EventResponse()
- response.status = proto.EventResponse.OK
- done.run(response)
-
- def ping(self, controller, request, done):
+ def __init__(self, emit_addr, reg_addr):
"""
- Reply to a ping request.
-
- :param controller: used to mediate a single method call
- :type controller: protobuf.socketrpc.controller.SocketRpcController
- :param request: the request received from the client
- :type request: leap.common.events.events_pb2.RegisterRequest
- :param done: callback to be called when done
- :type done: protobuf.socketrpc.server.Callback
- """
- logger.debug("Received ping request, sending response.")
- response = proto.EventResponse()
- response.status = proto.EventResponse.OK
- done.run(response)
-
+ Initialize the events server.
-class EventsServerDaemon(daemon.EventsSingletonDaemon):
- """
- Singleton class for starting an events server daemon.
- """
-
- @classmethod
- def ensure(cls, port):
+ :param emit_addr: The address in which to receive events from clients.
+ :type emit_addr: str
+ :param reg_addr: The address to which publish events to clients.
+ :type reg_addr: str
"""
- Make sure the daemon is running on the given port.
-
- :param port: the port in which the daemon should listen
- :type port: int
+ TxZmqServerComponent.__init__(self)
+ # bind PULL and PUB sockets
+ self._pull, self.pull_port = self._zmq_bind(
+ txzmq.ZmqPullConnection, emit_addr)
+ self._pub, self.pub_port = self._zmq_bind(
+ txzmq.ZmqPubConnection, reg_addr)
+ # set a handler for arriving messages
+ self._pull.onPull = self._onPull
+
+ def _onPull(self, message):
+ """
+ Callback executed when a message is pulled from a client.
- :return: a daemon instance
- :rtype: EventsServerDaemon
+ :param message: The message sent by the client.
+ :type message: str
"""
- return cls.ensure_service(port, EventsServerService())
+ event, content = message[0].split(b"\0", 1)
+ logger.debug("Publishing event: %s" % event)
+ self._pub.publish(content, tag=event)
diff --git a/src/leap/common/events/tests/__init__.py b/src/leap/common/events/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/leap/common/events/tests/__init__.py
diff --git a/src/leap/common/events/tests/test_zmq_components.py b/src/leap/common/events/tests/test_zmq_components.py
new file mode 100644
index 0000000..c51e37e
--- /dev/null
+++ b/src/leap/common/events/tests/test_zmq_components.py
@@ -0,0 +1,51 @@
+# -*- coding: utf-8 -*-
+# test_zmq_components.py
+# Copyright (C) 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for the zmq_components module.
+"""
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
+
+from leap.common.events import zmq_components
+
+
+class AddrParseTestCase(unittest.TestCase):
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_addr_parsing(self):
+ addr_re = zmq_components.ADDRESS_RE
+
+ self.assertEqual(
+ addr_re.search("ipc:///tmp/foo/bar/baaz-2/foo.0").groups(),
+ ("ipc", "/tmp/foo/bar/baaz-2/foo.0", None))
+ self.assertEqual(
+ addr_re.search("tcp://localhost:9000").groups(),
+ ("tcp", "localhost", "9000"))
+ self.assertEqual(
+ addr_re.search("tcp://127.0.0.1:9000").groups(),
+ ("tcp", "127.0.0.1", "9000"))
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/src/leap/common/events/txclient.py b/src/leap/common/events/txclient.py
new file mode 100644
index 0000000..dfd0533
--- /dev/null
+++ b/src/leap/common/events/txclient.py
@@ -0,0 +1,194 @@
+# -*- coding: utf-8 -*-
+# txclient.py
+# Copyright (C) 2013, 2014, 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+The client end point of the events mechanism, implemented using txzmq.
+
+Clients are the communicating parties of the events mechanism. They
+communicate by sending messages to a server, which in turn redistributes
+messages to other clients.
+
+When a client registers a callback for a given event, it also tells the
+server that it wants to be notified whenever events of that type are sent by
+some other client.
+"""
+import logging
+import pickle
+
+import txzmq
+
+from leap.common.events.zmq_components import TxZmqClientComponent
+from leap.common.events.client import EventsClient
+from leap.common.events.client import configure_client
+from leap.common.events.server import EMIT_ADDR
+from leap.common.events.server import REG_ADDR
+from leap.common.events import catalog
+
+
+logger = logging.getLogger(__name__)
+
+
+__all__ = [
+ "configure_client",
+ "EventsTxClient",
+ "register",
+ "unregister",
+ "emit",
+ "shutdown",
+]
+
+
+class EventsTxClient(TxZmqClientComponent, EventsClient):
+ """
+ A twisted events client that listens for events in one address and
+ publishes those events to another address.
+ """
+
+ def __init__(self, emit_addr=EMIT_ADDR, reg_addr=REG_ADDR,
+ path_prefix=None):
+ """
+ Initialize the events server.
+ """
+ TxZmqClientComponent.__init__(self, path_prefix=path_prefix)
+ EventsClient.__init__(self, emit_addr, reg_addr)
+ # connect SUB first, otherwise we might miss some event sent from this
+ # same client
+ self._sub = self._zmq_connect(txzmq.ZmqSubConnection, reg_addr)
+ self._sub.gotMessage = self._gotMessage
+ self._push = self._zmq_connect(txzmq.ZmqPushConnection, emit_addr)
+
+ def _gotMessage(self, msg, tag):
+ """
+ Handle an incoming event.
+
+ :param msg: The incoming message.
+ :type msg: list(str)
+ """
+ event = getattr(catalog, tag)
+ content = pickle.loads(msg)
+ self._handle_event(event, content)
+
+ def _subscribe(self, tag):
+ """
+ Subscribe to a tag on the zmq SUB socket.
+
+ :param tag: The tag to be subscribed.
+ :type tag: str
+ """
+ self._sub.subscribe(tag)
+
+ def _unsubscribe(self, tag):
+ """
+ Unsubscribe from a tag on the zmq SUB socket.
+
+ :param tag: The tag to be unsubscribed.
+ :type tag: str
+ """
+ self._sub.unsubscribe(tag)
+
+ def _send(self, data):
+ """
+ Send data through PUSH socket.
+
+ :param data: The data to be sent.
+ :type event: str
+ """
+ self._push.send(data)
+
+ def _run_callback(self, callback, event, content):
+ """
+ Run a callback.
+
+ :param callback: The callback to be run.
+ :type callback: callable(event, *content)
+ :param event: The event to be sent.
+ :type event: Event
+ :param content: The content of the event.
+ :type content: list
+ """
+ callback(event, *content)
+
+ def shutdown(self):
+ TxZmqClientComponent.shutdown(self)
+ EventsClient.shutdown(self)
+
+
+def register(event, callback, uid=None, replace=False):
+ """
+ Register a callback to be executed when an event is received.
+
+ :param event: The event that triggers the callback.
+ :type event: str
+ :param callback: The callback to be executed.
+ :type callback: callable(event, content)
+ :param uid: The callback uid.
+ :type uid: str
+ :param replace: Wether an eventual callback with same ID should be
+ replaced.
+ :type replace: bool
+
+ :return: The callback uid.
+ :rtype: str
+
+ :raises CallbackAlreadyRegisteredError: when there's already a callback
+ identified by the given uid and replace is False.
+ """
+ return EventsTxClient.instance().register(
+ event, callback, uid=uid, replace=replace)
+
+
+def unregister(event, uid=None):
+ """
+ Unregister callbacks for an event.
+
+ If uid is not None, then only the callback identified by the given uid is
+ removed. Otherwise, all callbacks for the event are removed.
+
+ :param event: The event that triggers the callback.
+ :type event: str
+ :param uid: The callback uid.
+ :type uid: str
+ """
+ return EventsTxClient.instance().unregister(event, uid=uid)
+
+
+def emit(event, *content):
+ """
+ Send an event.
+
+ :param event: The event to be sent.
+ :type event: str
+ :param content: The content of the event.
+ :type content: list
+ """
+ return EventsTxClient.instance().emit(event, *content)
+
+
+def shutdown():
+ """
+ Shutdown the events client.
+ """
+ EventsTxClient.instance().shutdown()
+
+
+def instance():
+ """
+ Return an instance of the events client.
+
+ :return: An instance of the events client.
+ :rtype: EventsClientThread
+ """
+ return EventsTxClient.instance()
diff --git a/src/leap/common/events/zmq_components.py b/src/leap/common/events/zmq_components.py
new file mode 100644
index 0000000..f99c754
--- /dev/null
+++ b/src/leap/common/events/zmq_components.py
@@ -0,0 +1,188 @@
+# -*- coding: utf-8 -*-
+# zmq.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+"""
+The server for the events mechanism.
+"""
+
+
+import os
+import logging
+import txzmq
+import re
+
+from abc import ABCMeta
+
+# XXX some distros don't package libsodium, so we have to be prepared for
+# absence of zmq.auth
+try:
+ import zmq.auth
+ from zmq.auth.thread import ThreadAuthenticator
+except ImportError:
+ pass
+
+from leap.common.config import get_path_prefix
+from leap.common.zmq_utils import zmq_has_curve
+from leap.common.zmq_utils import maybe_create_and_get_certificates
+from leap.common.zmq_utils import PUBLIC_KEYS_PREFIX
+
+
+logger = logging.getLogger(__name__)
+
+
+ADDRESS_RE = re.compile("^([a-z]+)://([^:]+):?(\d+)?$")
+
+
+class TxZmqComponent(object):
+ """
+ A twisted-powered zmq events component.
+ """
+
+ __metaclass__ = ABCMeta
+
+ _component_type = None
+
+ def __init__(self, path_prefix=None):
+ """
+ Initialize the txzmq component.
+ """
+ self._factory = txzmq.ZmqFactory()
+ self._factory.registerForShutdown()
+ if path_prefix is None:
+ path_prefix = get_path_prefix()
+ self._config_prefix = os.path.join(path_prefix, "leap", "events")
+ self._connections = []
+
+ @property
+ def component_type(self):
+ if not self._component_type:
+ raise Exception(
+ "Make sure implementations of TxZmqComponent"
+ "define a self._component_type!")
+ return self._component_type
+
+ def _zmq_connect(self, connClass, address):
+ """
+ Connect to an address.
+
+ :param connClass: The connection class to be used.
+ :type connClass: txzmq.ZmqConnection
+ :param address: The address to connect to.
+ :type address: str
+
+ :return: The binded connection.
+ :rtype: txzmq.ZmqConnection
+ """
+ connection = connClass(self._factory)
+ # create and configure socket
+ socket = connection.socket
+ if zmq_has_curve():
+ public, secret = maybe_create_and_get_certificates(
+ self._config_prefix, self.component_type)
+ server_public_file = os.path.join(
+ self._config_prefix, PUBLIC_KEYS_PREFIX, "server.key")
+ server_public, _ = zmq.auth.load_certificate(server_public_file)
+ socket.curve_publickey = public
+ socket.curve_secretkey = secret
+ socket.curve_serverkey = server_public
+ socket.connect(address)
+ logger.debug("Connected %s to %s." % (connClass, address))
+ self._connections.append(connection)
+ return connection
+
+ def _zmq_bind(self, connClass, address):
+ """
+ Bind to an address.
+
+ :param connClass: The connection class to be used.
+ :type connClass: txzmq.ZmqConnection
+ :param address: The address to bind to.
+ :type address: str
+
+ :return: The binded connection and port.
+ :rtype: (txzmq.ZmqConnection, int)
+ """
+ connection = connClass(self._factory)
+ socket = connection.socket
+ if zmq_has_curve():
+ public, secret = maybe_create_and_get_certificates(
+ self._config_prefix, self.component_type)
+ socket.curve_publickey = public
+ socket.curve_secretkey = secret
+ self._start_thread_auth(connection.socket)
+
+ proto, addr, port = ADDRESS_RE.search(address).groups()
+
+ if proto == "tcp":
+ if port is None or port is '0':
+ params = proto, addr
+ port = socket.bind_to_random_port("%s://%s" % params)
+ logger.debug("Binded %s to %s://%s." % ((connClass,) + params))
+ else:
+ params = proto, addr, int(port)
+ socket.bind("%s://%s:%d" % params)
+ logger.debug(
+ "Binded %s to %s://%s:%d." % ((connClass,) + params))
+ else:
+ params = proto, addr
+ socket.bind("%s://%s" % params)
+ logger.debug(
+ "Binded %s to %s://%s" % ((connClass,) + params))
+ self._connections.append(connection)
+ return connection, port
+
+ def _start_thread_auth(self, socket):
+ """
+ Start the zmq curve thread authenticator.
+
+ :param socket: The socket in which to configure the authenticator.
+ :type socket: zmq.Socket
+ """
+ authenticator = ThreadAuthenticator(self._factory.context)
+ authenticator.start()
+ # XXX do not hardcode this here.
+ authenticator.allow('127.0.0.1')
+ # tell authenticator to use the certificate in a directory
+ public_keys_dir = os.path.join(self._config_prefix, PUBLIC_KEYS_PREFIX)
+ authenticator.configure_curve(domain="*", location=public_keys_dir)
+ socket.curve_server = True # must come before bind
+
+ def shutdown(self):
+ """
+ Shutdown the component.
+ """
+ logger.debug("Shutting down component %s." % str(self))
+ for conn in self._connections:
+ conn.shutdown()
+ self._factory.shutdown()
+
+
+class TxZmqServerComponent(TxZmqComponent):
+ """
+ A txZMQ server component.
+ """
+
+ _component_type = "server"
+
+
+class TxZmqClientComponent(TxZmqComponent):
+ """
+ A txZMQ client component.
+ """
+
+ _component_type = "client"
diff --git a/src/leap/common/http.py b/src/leap/common/http.py
new file mode 100644
index 0000000..0dee3a2
--- /dev/null
+++ b/src/leap/common/http.py
@@ -0,0 +1,348 @@
+# -*- coding: utf-8 -*-
+# http.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Twisted HTTP/HTTPS client.
+"""
+
+try:
+ import twisted
+ assert twisted
+except ImportError:
+ print "*******"
+ print "Twisted is needed to use leap.common.http module"
+ print ""
+ print "Install the extra requirement of the package:"
+ print "$ pip install leap.common[Twisted]"
+ import sys
+ sys.exit(1)
+
+
+from leap.common.certs import get_compatible_ssl_context_factory
+from leap.common.check import leap_assert
+
+from zope.interface import implements
+
+from twisted.internet import reactor
+from twisted.internet import defer
+from twisted.python import failure
+
+from twisted.web.client import Agent
+from twisted.web.client import HTTPConnectionPool
+from twisted.web.client import _HTTP11ClientFactory as HTTP11ClientFactory
+from twisted.web.client import readBody
+from twisted.web.http_headers import Headers
+from twisted.web.iweb import IBodyProducer
+from twisted.web._newclient import HTTP11ClientProtocol
+
+
+__all__ = ["HTTPClient"]
+
+
+# A default HTTP timeout is used for 2 distinct purposes:
+# 1. as HTTP connection timeout, prior to connection estabilshment.
+# 2. as data reception timeout, after the connection has been established.
+DEFAULT_HTTP_TIMEOUT = 30 # seconds
+
+
+class _HTTP11ClientFactory(HTTP11ClientFactory):
+ """
+ A timeout-able HTTP 1.1 client protocol factory.
+ """
+
+ def __init__(self, quiescentCallback, timeout):
+ """
+ :param quiescentCallback: The quiescent callback to be passed to
+ protocol instances, used to return them to
+ the connection pool.
+ :type quiescentCallback: callable(Protocol)
+ :param timeout: The timeout, in seconds, for requests made by
+ protocols created by this factory.
+ :type timeout: float
+ """
+ HTTP11ClientFactory.__init__(self, quiescentCallback)
+ self._timeout = timeout
+
+ def buildProtocol(self, _):
+ """
+ Build the HTTP 1.1 client protocol.
+ """
+ return _HTTP11ClientProtocol(self._quiescentCallback, self._timeout)
+
+
+class _HTTPConnectionPool(HTTPConnectionPool):
+ """
+ A timeout-able HTTP connection pool.
+ """
+
+ _factory = _HTTP11ClientFactory
+
+ def __init__(self, reactor, persistent, timeout, maxPersistentPerHost=10):
+ HTTPConnectionPool.__init__(self, reactor, persistent=persistent)
+ self.maxPersistentPerHost = maxPersistentPerHost
+ self._timeout = timeout
+
+ def _newConnection(self, key, endpoint):
+ def quiescentCallback(protocol):
+ self._putConnection(key, protocol)
+ factory = self._factory(quiescentCallback, timeout=self._timeout)
+ return endpoint.connect(factory)
+
+
+class HTTPClient(object):
+ """
+ HTTP client done the twisted way, with a main focus on pinning the SSL
+ certificate.
+
+ By default, it uses a shared connection pool. If you want a dedicated
+ one, create and pass on __init__ pool parameter.
+ Please note that this client will limit the maximum amount of connections
+ by using a DeferredSemaphore.
+ This limit is equal to the maxPersistentPerHost used on pool and is needed
+ in order to avoid resource abuse on huge requests batches.
+ """
+
+ _pool = _HTTPConnectionPool(
+ reactor,
+ persistent=True,
+ timeout=DEFAULT_HTTP_TIMEOUT,
+ maxPersistentPerHost=10
+ )
+
+ def __init__(self, cert_file=None,
+ timeout=DEFAULT_HTTP_TIMEOUT, pool=None):
+ """
+ Init the HTTP client
+
+ :param cert_file: The path to the certificate file, if None given the
+ system's CAs will be used.
+ :type cert_file: str
+ :param timeout: The amount of time that this Agent will wait for the
+ peer to accept a connection and for each request to be
+ finished. If a pool is passed, then this argument is
+ ignored.
+ :type timeout: float
+ """
+
+ self._timeout = timeout
+ self._pool = pool if pool is not None else self._pool
+ self._agent = Agent(
+ reactor,
+ get_compatible_ssl_context_factory(cert_file),
+ pool=self._pool,
+ connectTimeout=self._timeout)
+ self._semaphore = defer.DeferredSemaphore(
+ self._pool.maxPersistentPerHost)
+
+ def _createPool(self, maxPersistentPerHost=10, persistent=True):
+ pool = _HTTPConnectionPool(reactor, persistent, self._timeout)
+ pool.maxPersistentPerHost = maxPersistentPerHost
+ return pool
+
+ def _request(self, url, method, body, headers, callback):
+ """
+ Perform an HTTP request.
+
+ :param url: The URL for the request.
+ :type url: str
+ :param method: The HTTP method of the request.
+ :type method: str
+ :param body: The body of the request, if any.
+ :type body: str
+ :param headers: The headers of the request.
+ :type headers: dict
+ :param callback: A callback to be added to the request's deferred
+ callback chain.
+ :type callback: callable
+
+ :return: A deferred that fires with the body of the request.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ if body:
+ body = _StringBodyProducer(body)
+ d = self._agent.request(
+ method, url, headers=Headers(headers), bodyProducer=body)
+ d.addCallback(callback)
+ return d
+
+ def request(self, url, method='GET', body=None, headers={},
+ callback=readBody):
+ """
+ Perform an HTTP request, but limit the maximum amount of concurrent
+ connections.
+
+ May be passed a callback to be added to the request's deferred
+ callback chain. The callback is expected to receive the response of
+ the request and may do whatever it wants with the response. By
+ default, if no callback is passed, we will use a simple body reader
+ which returns a deferred that is fired with the body of the response.
+
+ :param url: The URL for the request.
+ :type url: str
+ :param method: The HTTP method of the request.
+ :type method: str
+ :param body: The body of the request, if any.
+ :type body: str
+ :param headers: The headers of the request.
+ :type headers: dict
+ :param callback: A callback to be added to the request's deferred
+ callback chain.
+ :type callback: callable
+
+ :return: A deferred that fires with the body of the request.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ leap_assert(
+ callable(callback),
+ message="The callback parameter should be a callable!")
+ return self._semaphore.run(self._request, url, method, body, headers,
+ callback)
+
+ def close(self):
+ """
+ Close any cached connections.
+ """
+ self._pool.closeCachedConnections()
+
+#
+# An IBodyProducer to write the body of an HTTP request as a string.
+#
+
+
+class _StringBodyProducer(object):
+ """
+ A producer that writes the body of a request to a consumer.
+ """
+
+ implements(IBodyProducer)
+
+ def __init__(self, body):
+ """
+ Initialize the string produer.
+
+ :param body: The body of the request.
+ :type body: str
+ """
+ self.body = body
+ self.length = len(body)
+
+ def startProducing(self, consumer):
+ """
+ Write the body to the consumer.
+
+ :param consumer: Any IConsumer provider.
+ :type consumer: twisted.internet.interfaces.IConsumer
+
+ :return: A successful deferred.
+ :rtype: twisted.internet.defer.Deferred
+ """
+ consumer.write(self.body)
+ return defer.succeed(None)
+
+ def pauseProducing(self):
+ pass
+
+ def stopProducing(self):
+ pass
+
+
+#
+# Patched twisted.web classes
+#
+
+class _HTTP11ClientProtocol(HTTP11ClientProtocol):
+ """
+ A timeout-able HTTP 1.1 client protocol, that is instantiated by the
+ _HTTP11ClientFactory below.
+ """
+
+ def __init__(self, quiescentCallback, timeout):
+ """
+ Initialize the protocol.
+
+ :param quiescentCallback:
+ :type quiescentCallback: callable
+ :param timeout: A timeout, in seconds, for requests made by this
+ protocol.
+ :type timeout: float
+ """
+ HTTP11ClientProtocol.__init__(self, quiescentCallback)
+ self._timeout = timeout
+ self._timeoutCall = None
+
+ def request(self, request):
+ """
+ Issue request over self.transport and return a Deferred which
+ will fire with a Response instance or an error.
+
+ :param request: The object defining the parameters of the request to
+ issue.
+ :type request: twisted.web._newclient.Request
+
+ :return: A deferred which fires after the request has finished.
+ :rtype: Deferred
+ """
+ d = HTTP11ClientProtocol.request(self, request)
+ if self._timeout:
+ self._last_buffer_len = 0
+ timeoutCall = reactor.callLater(
+ self._timeout, self._doTimeout, request)
+ self._timeoutCall = timeoutCall
+ return d
+
+ def _doTimeout(self, request):
+ """
+ Give up the request because of a timeout.
+
+ :param request: The object defining the parameters of the request to
+ issue.
+ :type request: twisted.web._newclient.Request
+ """
+ self._giveUp(
+ failure.Failure(
+ defer.TimeoutError(
+ "Getting %s took longer than %s seconds."
+ % (request.absoluteURI, self._timeout))))
+
+ def _cancelTimeout(self):
+ """
+ Cancel the request timeout, when it's finished.
+ """
+ if self._timeoutCall and self._timeoutCall.active():
+ self._timeoutCall.cancel()
+ self._timeoutCall = None
+
+ def _finishResponse(self, rest):
+ """
+ Cancel the timeout when finished receiving the response.
+ """
+ self._cancelTimeout()
+ HTTP11ClientProtocol._finishResponse(self, rest)
+
+ def dataReceived(self, bytes):
+ """
+ Receive some data and extend the timeout period of this request.
+
+ :param bytes: A string of indeterminate length.
+ :type bytes: str
+ """
+ HTTP11ClientProtocol.dataReceived(self, bytes)
+ if self._timeoutCall and self._timeoutCall.active():
+ self._timeoutCall.reset(self._timeout)
+
+ def connectionLost(self, reason):
+ self._cancelTimeout()
+ return HTTP11ClientProtocol.connectionLost(self, reason)
diff --git a/src/leap/common/plugins.py b/src/leap/common/plugins.py
new file mode 100644
index 0000000..04152f9
--- /dev/null
+++ b/src/leap/common/plugins.py
@@ -0,0 +1,76 @@
+# -*- coding: utf-8 -*-
+# plugins.py
+# Copyright (C) 2015 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Twisted plugins leap utilities.
+"""
+import os.path
+
+from twisted.plugin import getPlugins
+
+from leap.common.config import get_path_prefix
+
+# A whitelist of modules from where to collect plugins dynamically.
+# For the moment restricted to leap namespace, but the idea is that we can pass
+# other "trusted" modules as options to the initialization of soledad.
+
+# TODO discover all the namespace automagically
+
+PLUGGABLE_LEAP_MODULES = ('mail', 'keymanager')
+
+_preffix = get_path_prefix()
+rc_file = os.path.join(_preffix, "leap", "leap.cfg")
+
+
+def _get_extra_pluggable_modules():
+ import ConfigParser
+ config = ConfigParser.RawConfigParser()
+ config.read(rc_file)
+ try:
+ modules = eval(
+ config.get('plugins', 'extra_pluggable_modules'), {}, {})
+ except (ConfigParser.NoSectionError, ConfigParser.NoOptionError,
+ ConfigParser.MissingSectionHeaderError):
+ modules = []
+ return modules
+
+if os.path.isfile(rc_file):
+ # TODO in the case of being called from the standalone client,
+ # we should pass the flag in some other way.
+ EXTRA_PLUGGABLE_MODULES = _get_extra_pluggable_modules()
+else:
+ EXTRA_PLUGGABLE_MODULES = []
+
+
+def collect_plugins(interface):
+ """
+ Traverse a whitelist of modules and collect all the plugins that implement
+ the passed interface.
+ """
+ plugins = []
+ for namespace in PLUGGABLE_LEAP_MODULES:
+ try:
+ module = __import__('leap.%s.plugins' % namespace, fromlist='.')
+ plugins = plugins + list(getPlugins(interface, module))
+ except ImportError:
+ pass
+ for namespace in EXTRA_PLUGGABLE_MODULES:
+ try:
+ module = __import__('%s.plugins' % namespace, fromlist='.')
+ plugins = plugins + list(getPlugins(interface, module))
+ except ImportError:
+ pass
+ return plugins
diff --git a/src/leap/common/testing/basetest.py b/src/leap/common/testing/basetest.py
index 54826d5..3d3cee0 100644
--- a/src/leap/common/testing/basetest.py
+++ b/src/leap/common/testing/basetest.py
@@ -28,8 +28,13 @@ except ImportError:
import unittest
from leap.common.check import leap_assert
+from leap.common.events import server as events_server
+from leap.common.events import client as events_client
+from leap.common.events import flags, set_events_enabled
from leap.common.files import mkdir_p, check_and_fix_urw_only
+set_events_enabled(False)
+
class BaseLeapTest(unittest.TestCase):
"""
@@ -40,6 +45,14 @@ class BaseLeapTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
+ cls.setUpEnv()
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.tearDownEnv()
+
+ @classmethod
+ def setUpEnv(cls):
"""
Sets up common facilities for testing this TestCase:
- custom PATH and HOME environmental variables
@@ -48,6 +61,9 @@ class BaseLeapTest(unittest.TestCase):
"""
cls.old_path = os.environ['PATH']
cls.old_home = os.environ['HOME']
+ cls.old_xdg_config = None
+ if "XDG_CONFIG_HOME" in os.environ:
+ cls.old_xdg_config = os.environ["XDG_CONFIG_HOME"]
cls.tempdir = tempfile.mkdtemp(prefix="leap_tests-")
cls.home = cls.tempdir
bin_tdir = os.path.join(
@@ -55,20 +71,41 @@ class BaseLeapTest(unittest.TestCase):
'bin')
os.environ["PATH"] = bin_tdir
os.environ["HOME"] = cls.tempdir
+ os.environ["XDG_CONFIG_HOME"] = os.path.join(cls.tempdir, ".config")
+ cls._init_events()
@classmethod
- def tearDownClass(cls):
+ def _init_events(cls):
+ if flags.EVENTS_ENABLED:
+ cls._server = events_server.ensure_server(
+ emit_addr="tcp://127.0.0.1:0",
+ reg_addr="tcp://127.0.0.1:0")
+ events_client.configure_client(
+ emit_addr="tcp://127.0.0.1:%d" % cls._server.pull_port,
+ reg_addr="tcp://127.0.0.1:%d" % cls._server.pub_port)
+
+ @classmethod
+ def tearDownEnv(cls):
"""
Cleanup common facilities used for testing this TestCase:
- restores the default PATH and HOME variables
- removes the temporal folder
"""
+ if flags.EVENTS_ENABLED:
+ events_client.shutdown()
+ cls._server.shutdown()
+
os.environ["PATH"] = cls.old_path
os.environ["HOME"] = cls.old_home
+ if cls.old_xdg_config is not None:
+ os.environ["XDG_CONFIG_HOME"] = cls.old_xdg_config
# safety check! please do not wipe my home...
# XXX needs to adapt to non-linuces
leap_assert(
cls.tempdir.startswith('/tmp/leap_tests-') or
+ (cls.tempdir.startswith('/tmp/') and
+ cls.tempdir.startswith(tempfile.gettempdir()) and
+ 'leap_tests-' in cls.tempdir) or
cls.tempdir.startswith('/var/folder'),
"beware! tried to remove a dir which does not "
"live in temporal folder!")
diff --git a/src/leap/common/testing/leaptest_combined_keycert.pem b/src/leap/common/testing/leaptest_combined_keycert.pem
new file mode 100644
index 0000000..f1e0fb2
--- /dev/null
+++ b/src/leap/common/testing/leaptest_combined_keycert.pem
@@ -0,0 +1,127 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDAT67c8y4ORa6o
+/gor3gAouJI6W6R7YlPzzh4uaba4YhqgAEZzcZ7S589CqON8Eo8G2tQvh+ZSaNmP
+idFDjFxfpb1sBMb9Nu720s0V8UMxzCDh7oyePZGdKP+F04+MkB0eHWSObBrzsjsj
+KVsD/5fUeLbd1SdK1C2cKzsoRQ+GVEyMJeiISFlV05fxdAZSybPcwH+3aiVX+7cM
+9ek8bufIeqrAfZVdvO/2Fuv6WfR2MEe32XO/9gj02XSGWd6sPXMn/lflKvNNKEWC
+UWdqTArGi0CqQ4D+Q73ruUIthCJr+7LWK+7QKG980a1RR7GZJvs3skhUDh3eua2u
+ICPiqQLlAgMBAAECggEBAKJr6j0UagaF1dFG1eJc2neKA36kXdQTpOIaaKU8haVO
+vjv6X4YrJT/tpsAfEhp9Ni1M7r7CIcXiZjVz6bkKOA5UVhqAImxEVClEuw/YN688
+P11yc3NGftBkiwNFPk0yflUr7/zV0yGVm5rD1+oVme9KkO/kkg4CDA+E9664PTdu
+S4NVvL6OgHUG2nucqIz0kzUBapo7okIkvggfldeDYF47rn/e0VikNpKkMzoCzWs+
+3ryzldfigwg18rE2nltQYk/Mi+H30iNTNEonLjs9YbiDPn8iy5SZ7xY4lAO1Urev
+skdY04hmkOzh9JEETHeTj1LUIw7tfRdS3X6B+19gkCECgYEA+41UOLBb0ZHsGjsA
+aZBtI4g8mvGTOFe/Az7Jm0404z6e8AIe4NeSJoJ6oV6WG2BmKvqTViYT2QnmauHA
+WB1xJ5fH3PK7/LjICI2QE247E294+g5p1KYysALc2fChfJNpdfETzWJAUSSBFqxb
+amCDZ4Gc8S/V44fEkJKPLuAvxL0CgYEAw7YyGsEX+lDGzumFQY4wz20+Cyvilnx0
+xoFGQ85RSprZttU64PID/u1HD9/Nv4lGcBDcTTtrmOEk9x51YzttrsPF9sn+Wj0y
+eahuR32Qc1652Y7fAKgN2vIZRFBcGpAsemcoqmYFRMImX60G0areKaclooimTbJA
+Si52P4IKnUkCgYAI+07ah1F/9hncBedJ3aJH9oFTdvSuulNTplZEeVJiGsZKA4le
+tdO+FEKUqG/rolGDj1bbaJik0zmq70yS2NpFc6HrPa+Aoohh5cwTJYhudTh4lTMq
+KJT+u9tu3KynagwF7gmq96scOpVxXc4VykRm2bXk1rRoX1yhXNpH7jFGcQKBgCn/
+v2DebzbYftGIa4BV80OQPfBHyqhgrO6sb1e9vtQzxuTlfW0ogpMCeG1/qbegzeze
+sWghiEWWi0g80RQqfK80dBcx4dObrmlNK91LpOQdP+TgNBr/9Xk22xU96YYJyoG6
+AZAPtLG8uF9v0jbMZECsDfeDO60Qw5snvViDn6OBAoGACAbbQCbuh0cduDVwn0uz
+8XDyRTKDhOsJ3p6UB1TjwvbLeoPI93Dv1gpEeoqELtCu+ZXr3+/i/IbwrxucrFNn
+L/s+4zR2mlEAxBdtCFsH/Qf7+iYpBFSo4YReXz3xR+EVTmswzodJCbtF74oQSr6o
+7d56Rd/5k2UkFeXnlGOLyAc=
+-----END PRIVATE KEY-----
+Certificate:
+ Data:
+ Version: 3 (0x2)
+ Serial Number: 4 (0x4)
+ Signature Algorithm: sha1WithRSAEncryption
+ Issuer: C=US, ST=CA, L=Cyberspace, O=LEAP Encryption Access Project, OU=cyberspace, CN=tests-leap.se/name=tests-leap/emailAddress=tests@leap.se
+ Validity
+ Not Before: Sep 3 17:52:16 2013 GMT
+ Not After : Sep 1 17:52:16 2023 GMT
+ Subject: C=US, ST=CA, L=Cyberspace, O=LEAP Encryption Access Project, OU=cyberspace, CN=localhost/name=tests-leap/emailAddress=tests@leap.se
+ Subject Public Key Info:
+ Public Key Algorithm: rsaEncryption
+ Public-Key: (2048 bit)
+ Modulus:
+ 00:c0:4f:ae:dc:f3:2e:0e:45:ae:a8:fe:0a:2b:de:
+ 00:28:b8:92:3a:5b:a4:7b:62:53:f3:ce:1e:2e:69:
+ b6:b8:62:1a:a0:00:46:73:71:9e:d2:e7:cf:42:a8:
+ e3:7c:12:8f:06:da:d4:2f:87:e6:52:68:d9:8f:89:
+ d1:43:8c:5c:5f:a5:bd:6c:04:c6:fd:36:ee:f6:d2:
+ cd:15:f1:43:31:cc:20:e1:ee:8c:9e:3d:91:9d:28:
+ ff:85:d3:8f:8c:90:1d:1e:1d:64:8e:6c:1a:f3:b2:
+ 3b:23:29:5b:03:ff:97:d4:78:b6:dd:d5:27:4a:d4:
+ 2d:9c:2b:3b:28:45:0f:86:54:4c:8c:25:e8:88:48:
+ 59:55:d3:97:f1:74:06:52:c9:b3:dc:c0:7f:b7:6a:
+ 25:57:fb:b7:0c:f5:e9:3c:6e:e7:c8:7a:aa:c0:7d:
+ 95:5d:bc:ef:f6:16:eb:fa:59:f4:76:30:47:b7:d9:
+ 73:bf:f6:08:f4:d9:74:86:59:de:ac:3d:73:27:fe:
+ 57:e5:2a:f3:4d:28:45:82:51:67:6a:4c:0a:c6:8b:
+ 40:aa:43:80:fe:43:bd:eb:b9:42:2d:84:22:6b:fb:
+ b2:d6:2b:ee:d0:28:6f:7c:d1:ad:51:47:b1:99:26:
+ fb:37:b2:48:54:0e:1d:de:b9:ad:ae:20:23:e2:a9:
+ 02:e5
+ Exponent: 65537 (0x10001)
+ X509v3 extensions:
+ X509v3 Basic Constraints:
+ CA:FALSE
+ Netscape Cert Type:
+ SSL Server
+ Netscape Comment:
+ Easy-RSA Generated Server Certificate
+ X509v3 Subject Key Identifier:
+ 51:92:B6:A3:D7:D6:EC:8F:FC:16:C5:D4:0F:87:CC:EA:5C:7C:17:81
+ X509v3 Authority Key Identifier:
+ keyid:36:19:70:96:A9:5C:FE:A3:82:0F:79:95:31:52:2B:4A:41:BD:81:CB
+ DirName:/C=US/ST=CA/L=Cyberspace/O=LEAP Encryption Access Project/OU=cyberspace/CN=tests-leap.se/name=tests-leap/emailAddress=tests@leap.se
+ serial:8B:BF:41:63:1E:10:6A:EC
+
+ X509v3 Extended Key Usage:
+ TLS Web Server Authentication
+ X509v3 Key Usage:
+ Digital Signature, Key Encipherment
+ Signature Algorithm: sha1WithRSAEncryption
+ 88:d9:35:e0:d9:fa:fd:6b:57:e2:4d:f6:ef:91:6f:56:a6:2b:
+ 1a:1e:ec:8f:b0:18:e3:ec:ca:c9:1e:78:07:1d:0f:cf:fe:09:
+ 21:84:25:c4:27:ea:22:d7:48:53:73:ed:78:0f:42:5c:c7:f7:
+ 38:04:84:df:67:99:fd:75:6f:e8:dc:3b:91:ab:fa:c7:32:e5:
+ fd:3b:ce:de:7c:6a:df:39:46:1e:46:3a:4d:e1:e1:60:f3:bf:
+ aa:b2:0b:5d:ee:f2:0c:ee:82:7f:b5:02:75:04:47:d5:2b:c8:
+ e0:6d:6a:10:4a:ca:e0:c3:4f:ee:ff:15:71:37:f5:4d:95:38:
+ fe:a3:da:84:46:90:04:c2:61:86:a0:7f:e2:7d:62:46:6d:f6:
+ f8:90:51:88:c3:f2:8c:ca:b3:89:40:9f:6b:8b:33:65:e1:fd:
+ 0f:8b:d7:6a:93:dc:de:be:85:07:c7:d1:1d:b5:db:70:54:9f:
+ 95:d8:fb:11:f7:a7:e6:90:ba:9b:28:0e:3d:47:7a:63:6d:60:
+ 44:f6:96:aa:b6:a2:bc:0a:e5:25:c8:a2:74:91:54:95:bb:e2:
+ 09:01:56:73:6e:56:e8:6f:d6:a5:d8:18:96:c1:82:ef:2c:9e:
+ e2:4c:94:bc:00:71:5e:16:49:6b:e4:94:7a:d1:0c:2e:f4:19:
+ b2:2a:c2:b8
+-----BEGIN CERTIFICATE-----
+MIIFdDCCBFygAwIBAgIBBDANBgkqhkiG9w0BAQUFADCBuDELMAkGA1UEBhMCVVMx
+CzAJBgNVBAgTAkNBMRMwEQYDVQQHEwpDeWJlcnNwYWNlMScwJQYDVQQKEx5MRUFQ
+IEVuY3J5cHRpb24gQWNjZXNzIFByb2plY3QxEzARBgNVBAsTCmN5YmVyc3BhY2Ux
+FjAUBgNVBAMTDXRlc3RzLWxlYXAuc2UxEzARBgNVBCkTCnRlc3RzLWxlYXAxHDAa
+BgkqhkiG9w0BCQEWDXRlc3RzQGxlYXAuc2UwHhcNMTMwOTAzMTc1MjE2WhcNMjMw
+OTAxMTc1MjE2WjCBtDELMAkGA1UEBhMCVVMxCzAJBgNVBAgTAkNBMRMwEQYDVQQH
+EwpDeWJlcnNwYWNlMScwJQYDVQQKEx5MRUFQIEVuY3J5cHRpb24gQWNjZXNzIFBy
+b2plY3QxEzARBgNVBAsTCmN5YmVyc3BhY2UxEjAQBgNVBAMTCWxvY2FsaG9zdDET
+MBEGA1UEKRMKdGVzdHMtbGVhcDEcMBoGCSqGSIb3DQEJARYNdGVzdHNAbGVhcC5z
+ZTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMBPrtzzLg5Frqj+Cive
+ACi4kjpbpHtiU/POHi5ptrhiGqAARnNxntLnz0Ko43wSjwba1C+H5lJo2Y+J0UOM
+XF+lvWwExv027vbSzRXxQzHMIOHujJ49kZ0o/4XTj4yQHR4dZI5sGvOyOyMpWwP/
+l9R4tt3VJ0rULZwrOyhFD4ZUTIwl6IhIWVXTl/F0BlLJs9zAf7dqJVf7twz16Txu
+58h6qsB9lV287/YW6/pZ9HYwR7fZc7/2CPTZdIZZ3qw9cyf+V+Uq800oRYJRZ2pM
+CsaLQKpDgP5Dveu5Qi2EImv7stYr7tAob3zRrVFHsZkm+zeySFQOHd65ra4gI+Kp
+AuUCAwEAAaOCAYkwggGFMAkGA1UdEwQCMAAwEQYJYIZIAYb4QgEBBAQDAgZAMDQG
+CWCGSAGG+EIBDQQnFiVFYXN5LVJTQSBHZW5lcmF0ZWQgU2VydmVyIENlcnRpZmlj
+YXRlMB0GA1UdDgQWBBRRkraj19bsj/wWxdQPh8zqXHwXgTCB7QYDVR0jBIHlMIHi
+gBQ2GXCWqVz+o4IPeZUxUitKQb2By6GBvqSBuzCBuDELMAkGA1UEBhMCVVMxCzAJ
+BgNVBAgTAkNBMRMwEQYDVQQHEwpDeWJlcnNwYWNlMScwJQYDVQQKEx5MRUFQIEVu
+Y3J5cHRpb24gQWNjZXNzIFByb2plY3QxEzARBgNVBAsTCmN5YmVyc3BhY2UxFjAU
+BgNVBAMTDXRlc3RzLWxlYXAuc2UxEzARBgNVBCkTCnRlc3RzLWxlYXAxHDAaBgkq
+hkiG9w0BCQEWDXRlc3RzQGxlYXAuc2WCCQCLv0FjHhBq7DATBgNVHSUEDDAKBggr
+BgEFBQcDATALBgNVHQ8EBAMCBaAwDQYJKoZIhvcNAQEFBQADggEBAIjZNeDZ+v1r
+V+JN9u+Rb1amKxoe7I+wGOPsyskeeAcdD8/+CSGEJcQn6iLXSFNz7XgPQlzH9zgE
+hN9nmf11b+jcO5Gr+scy5f07zt58at85Rh5GOk3h4WDzv6qyC13u8gzugn+1AnUE
+R9UryOBtahBKyuDDT+7/FXE39U2VOP6j2oRGkATCYYagf+J9YkZt9viQUYjD8ozK
+s4lAn2uLM2Xh/Q+L12qT3N6+hQfH0R2123BUn5XY+xH3p+aQupsoDj1HemNtYET2
+lqq2orwK5SXIonSRVJW74gkBVnNuVuhv1qXYGJbBgu8snuJMlLwAcV4WSWvklHrR
+DC70GbIqwrg=
+-----END CERTIFICATE-----
diff --git a/src/leap/common/testing/test_basetest.py b/src/leap/common/testing/test_basetest.py
index cf0962d..ec42a62 100644
--- a/src/leap/common/testing/test_basetest.py
+++ b/src/leap/common/testing/test_basetest.py
@@ -83,12 +83,10 @@ class TestInitBaseLeapTest(BaseLeapTest):
"""
def setUp(self):
- """nuke it"""
- pass
+ self.setUpEnv()
def tearDown(self):
- """nuke it"""
- pass
+ self.tearDownEnv()
def test_path_is_changed(self):
"""tests whether we have changed the PATH env var"""
diff --git a/src/leap/common/tests/test_certs.py b/src/leap/common/tests/test_certs.py
new file mode 100644
index 0000000..8ebc0f4
--- /dev/null
+++ b/src/leap/common/tests/test_certs.py
@@ -0,0 +1,99 @@
+# -*- coding: utf-8 -*-
+# test_certs.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for:
+ * leap/common/certs.py
+"""
+import os
+import time
+
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
+
+from leap.common import certs
+from leap.common.testing.basetest import BaseLeapTest
+
+TEST_CERT_PEM = os.path.join(
+ os.path.split(__file__)[0],
+ '..', 'testing', "leaptest_combined_keycert.pem")
+
+# Values from the test cert file:
+# Not Before: Sep 3 17:52:16 2013 GMT
+# Not After : Sep 1 17:52:16 2023 GMT
+CERT_NOT_BEFORE = (2013, 9, 3, 17, 52, 16, 1, 246, 0)
+CERT_NOT_AFTER = (2023, 9, 1, 17, 52, 16, 4, 244, 0)
+
+
+class CertsTest(BaseLeapTest):
+
+ def setUp(self):
+ self.setUpEnv()
+
+ def tearDown(self):
+ self.tearDownEnv()
+
+ def test_should_redownload_if_no_cert(self):
+ self.assertTrue(certs.should_redownload(certfile=""))
+
+ def test_should_redownload_if_invalid_pem(self):
+ cert_path = self.get_tempfile('test_pem_file.pem')
+
+ with open(cert_path, 'w') as f:
+ f.write('this is some invalid data for the pem file')
+
+ self.assertTrue(certs.should_redownload(cert_path))
+
+ def test_should_redownload_if_before(self):
+ def new_now():
+ time.struct_time(CERT_NOT_BEFORE)
+ self.assertTrue(certs.should_redownload(TEST_CERT_PEM, now=new_now))
+
+ def test_should_redownload_if_after(self):
+ def new_now():
+ time.struct_time(CERT_NOT_AFTER)
+ self.assertTrue(certs.should_redownload(TEST_CERT_PEM, now=new_now))
+
+ def test_not_should_redownload(self):
+ self.assertFalse(certs.should_redownload(TEST_CERT_PEM))
+
+ def test_is_valid_pemfile(self):
+ with open(TEST_CERT_PEM) as f:
+ cert_data = f.read()
+
+ self.assertTrue(certs.is_valid_pemfile(cert_data))
+
+ def test_not_is_valid_pemfile(self):
+ cert_data = 'this is some invalid data for the pem file'
+
+ self.assertFalse(certs.is_valid_pemfile(cert_data))
+
+ def test_get_cert_time_boundaries(self):
+ """
+ This test ensures us that the returned values are returned in UTC/GMT.
+ """
+ with open(TEST_CERT_PEM) as f:
+ cert_data = f.read()
+
+ valid_from, valid_to = certs.get_cert_time_boundaries(cert_data)
+ self.assertEqual(tuple(valid_from), CERT_NOT_BEFORE)
+ self.assertEqual(tuple(valid_to), CERT_NOT_AFTER)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/src/leap/common/tests/test_events.py b/src/leap/common/tests/test_events.py
index 0779b2e..2ad097e 100644
--- a/src/leap/common/tests/test_events.py
+++ b/src/leap/common/tests/test_events.py
@@ -1,4 +1,4 @@
-## -*- coding: utf-8 -*-
+# -*- coding: utf-8 -*-
# test_events.py
# Copyright (C) 2013 LEAP
#
@@ -15,414 +15,184 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import unittest
-import sets
-import time
-import socket
-import threading
-import random
-
-
-from mock import Mock
-from protobuf.socketrpc import RpcService
-from leap.common import events
-from leap.common.events import (
- server,
- client,
- mac_auth,
-)
-from leap.common.events.events_pb2 import (
- EventsServerService,
- EventsServerService_Stub,
- EventsClientService_Stub,
- EventResponse,
- SignalRequest,
- RegisterRequest,
- PingRequest,
- SOLEDAD_CREATING_KEYS,
- CLIENT_UID,
-)
+import os
+import logging
+import time
-port = 8090
+from twisted.internet.reactor import callFromThread
+from twisted.trial import unittest
+from twisted.internet import defer
-received = False
+from leap.common.events import server
+from leap.common.events import client
+from leap.common.events import flags
+from leap.common.events import txclient
+from leap.common.events import catalog
+from leap.common.events.errors import CallbackAlreadyRegisteredError
-class EventsTestCase(unittest.TestCase):
+if 'DEBUG' in os.environ:
+ logging.basicConfig(level=logging.DEBUG)
- @classmethod
- def setUpClass(cls):
- server.EventsServerDaemon.ensure(8090)
- cls.callbacks = events.client.registered_callbacks
- @classmethod
- def tearDownClass(cls):
- # give some time for requests to be processed.
- time.sleep(1)
+class EventsGenericClientTestCase(object):
def setUp(self):
- super(EventsTestCase, self).setUp()
+ flags.set_events_enabled(True)
+ self._server = server.ensure_server(
+ emit_addr="tcp://127.0.0.1:0",
+ reg_addr="tcp://127.0.0.1:0")
+ self._client.configure_client(
+ emit_addr="tcp://127.0.0.1:%d" % self._server.pull_port,
+ reg_addr="tcp://127.0.0.1:%d" % self._server.pub_port)
def tearDown(self):
- #events.client.registered_callbacks = {}
- server.registered_clients = {}
- super(EventsTestCase, self).tearDown()
-
- def test_service_singleton(self):
- """
- Ensure that there's always just one instance of the server daemon
- running.
- """
- service1 = server.EventsServerDaemon.ensure(8090)
- service2 = server.EventsServerDaemon.ensure(8090)
- self.assertEqual(service1, service2,
- "Can't get singleton class for service.")
+ self._client.shutdown()
+ self._server.shutdown()
+ flags.set_events_enabled(False)
+ # wait a bit for sockets to close properly
+ time.sleep(0.1)
def test_client_register(self):
"""
Ensure clients can register callbacks.
"""
- self.assertTrue(1 not in self.callbacks,
- 'There should should be no callback for this signal.')
- events.register(1, lambda x: True)
- self.assertTrue(1 in self.callbacks,
- 'Could not register signal in local client.')
- events.register(2, lambda x: True)
- self.assertTrue(1 in self.callbacks,
- 'Could not register signal in local client.')
- self.assertTrue(2 in self.callbacks,
- 'Could not register signal in local client.')
+ callbacks = self._client.instance().callbacks
+ self.assertTrue(len(callbacks) == 0,
+ 'There should be no callback for this event.')
+ # register one event
+ event1 = catalog.CLIENT_UID
- def test_register_signal_replace(self):
- """
- Make sure clients can replace already registered callbacks.
- """
- sig = 3
- cbk = lambda x: True
- events.register(sig, cbk, uid=1)
- self.assertRaises(Exception, events.register, sig, lambda x: True,
- uid=1)
- events.register(sig, lambda x: True, uid=1, replace=True)
- self.assertTrue(sig in self.callbacks, 'Could not register signal.')
- self.assertEqual(1, len(self.callbacks[sig]),
- 'Wrong number of registered callbacks.')
-
- def test_signal_response_status(self):
- """
- Ensure there's an appropriate response from server when signaling.
- """
- sig = 4
- request = SignalRequest()
- request.event = sig
- request.content = 'my signal contents'
- request.mac_method = mac_auth.MacMethod.MAC_NONE
- request.mac = ""
- service = RpcService(EventsServerService_Stub, port, 'localhost')
- # test synch
- response = service.signal(request, timeout=1000)
- self.assertEqual(EventResponse.OK, response.status,
- 'Wrong response status.')
-
- def test_signal_executes_callback(self):
- """
- Ensure callback is executed upon receiving signal.
- """
- sig = CLIENT_UID
- request = SignalRequest()
- request.event = sig
- request.content = 'my signal contents'
- request.mac_method = mac_auth.MacMethod.MAC_NONE
- request.mac = ""
- service = RpcService(EventsServerService_Stub, port, 'localhost')
-
- # register a callback
- flag = Mock()
- events.register(sig, lambda req: flag(req.event))
- # signal
- response = service.signal(request)
- self.assertEqual(EventResponse.OK, response.status,
- 'Wrong response status.')
- time.sleep(1) # wait for signal to arrive
- flag.assert_called_once_with(sig)
-
- def test_events_server_service_register(self):
- """
- Ensure the server can register clients to be signaled.
- """
- sig = 5
- request = RegisterRequest()
- request.event = sig
- request.port = 8091
- request.mac_method = mac_auth.MacMethod.MAC_NONE
- request.mac = ""
- service = RpcService(EventsServerService_Stub, port, 'localhost')
- complist = server.registered_clients
- self.assertEqual({}, complist,
- 'There should be no registered_ports when '
- 'server has just been created.')
- response = service.register(request, timeout=1000)
- self.assertTrue(sig in complist, "Signal not registered succesfully.")
- self.assertTrue(8091 in complist[sig],
- 'Failed registering client port.')
-
- def test_client_request_register(self):
- """
- Ensure clients can register themselves with server.
- """
- sig = 6
- complist = server.registered_clients
- self.assertTrue(sig not in complist,
- 'There should be no registered clients for this '
- 'signal.')
- events.register(sig, lambda x: True)
- time.sleep(0.1)
- port = client.EventsClientDaemon.get_instance().get_port()
- self.assertTrue(sig in complist, 'Failed registering client.')
- self.assertTrue(port in complist[sig],
- 'Failed registering client port.')
+ def cbk1(event, _):
+ return True
- def test_client_receives_signal(self):
- """
- Ensure clients can receive signals.
- """
- sig = 7
- flag = Mock()
-
- events.register(sig, lambda req: flag(req.event))
- request = SignalRequest()
- request.event = sig
- request.content = ""
- request.mac_method = mac_auth.MacMethod.MAC_NONE
- request.mac = ""
- service = RpcService(EventsServerService_Stub, port, 'localhost')
- response = service.signal(request, timeout=1000)
- self.assertTrue(response is not None, 'Did not receive response.')
- time.sleep(0.5)
- flag.assert_called_once_with(sig)
-
- def test_client_send_signal(self):
- """
- Ensure clients can send signals.
- """
- sig = 8
- response = events.signal(sig)
- self.assertTrue(response.status == response.OK,
- 'Received wrong response status when signaling.')
+ uid1 = self._client.register(event1, cbk1)
+ # assert for correct registration
+ self.assertTrue(len(callbacks) == 1)
+ self.assertTrue(callbacks[event1][uid1] == cbk1,
+ 'Could not register event in local client.')
+ # register another event
+ event2 = catalog.CLIENT_SESSION_ID
- def test_client_unregister_all(self):
- """
- Test that the client can unregister all events for one signal.
- """
- sig = CLIENT_UID
- complist = server.registered_clients
- events.register(sig, lambda x: True)
- events.register(sig, lambda x: True)
- time.sleep(0.1)
- events.unregister(sig)
- time.sleep(0.1)
- port = client.EventsClientDaemon.get_instance().get_port()
- self.assertFalse(bool(complist[sig]))
- self.assertTrue(port not in complist[sig])
+ def cbk2(event, _):
+ return True
- def test_client_unregister_by_uid(self):
- """
- Test that the client can unregister an event by uid.
- """
- sig = CLIENT_UID
- complist = server.registered_clients
- events.register(sig, lambda x: True, uid='cbkuid')
- events.register(sig, lambda x: True, uid='cbkuid2')
- time.sleep(0.1)
- events.unregister(sig, uid='cbkuid')
- time.sleep(0.1)
- port = client.EventsClientDaemon.get_instance().get_port()
- self.assertTrue(sig in complist)
- self.assertTrue(len(complist[sig]) == 1)
- self.assertTrue(
- client.registered_callbacks[sig].pop()[0] == 'cbkuid2')
- self.assertTrue(port in complist[sig])
-
- def test_server_replies_ping(self):
- """
- Ensure server replies to a ping.
- """
- request = PingRequest()
- service = RpcService(EventsServerService_Stub, port, 'localhost')
- response = service.ping(request, timeout=1000)
- self.assertIsNotNone(response)
- self.assertEqual(EventResponse.OK, response.status,
- 'Wrong response status.')
-
- def test_client_replies_ping(self):
- """
- Ensure clients reply to a ping.
- """
- daemon = client.ensure_client_daemon()
- port = daemon.get_port()
- request = PingRequest()
- service = RpcService(EventsClientService_Stub, port, 'localhost')
- response = service.ping(request, timeout=1000)
- self.assertEqual(EventResponse.OK, response.status,
- 'Wrong response status.')
-
- def test_server_ping(self):
- """
- Ensure the function from server module pings correctly.
- """
- response = server.ping()
- self.assertIsNotNone(response)
- self.assertEqual(EventResponse.OK, response.status,
- 'Wrong response status.')
+ uid2 = self._client.register(event2, cbk2)
+ # assert for correct registration
+ self.assertTrue(len(callbacks) == 2)
+ self.assertTrue(callbacks[event2][uid2] == cbk2,
+ 'Could not register event in local client.')
- def test_client_ping(self):
+ def test_register_signal_replace(self):
"""
- Ensure the function from client module pings correctly.
+ Make sure clients can replace already registered callbacks.
"""
- daemon = client.ensure_client_daemon()
- response = client.ping(daemon.get_port())
- self.assertIsNotNone(response)
- self.assertEqual(EventResponse.OK, response.status,
- 'Wrong response status.')
+ event = catalog.CLIENT_UID
+ d = defer.Deferred()
- def test_module_ping_server(self):
- """
- Ensure the function from main module pings server correctly.
- """
- response = events.ping_server()
- self.assertIsNotNone(response)
- self.assertEqual(EventResponse.OK, response.status,
- 'Wrong response status.')
+ def cbk_fail(event, _):
+ return callFromThread(d.errback, event)
- def test_module_ping_client(self):
- """
- Ensure the function from main module pings clients correctly.
- """
- daemon = client.ensure_client_daemon()
- response = events.ping_client(daemon.get_port())
- self.assertIsNotNone(response)
- self.assertEqual(EventResponse.OK, response.status,
- 'Wrong response status.')
+ def cbk_succeed(event, _):
+ return callFromThread(d.callback, event)
+
+ self._client.register(event, cbk_fail, uid=1)
+ self._client.register(event, cbk_succeed, uid=1, replace=True)
+ self._client.emit(event, None)
+ return d
- def test_ensure_server_raises_if_port_taken(self):
+ def test_register_signal_replace_fails_when_replace_is_false(self):
"""
- Verify that server raises an exception if port is already taken.
+ Make sure clients trying to replace already registered callbacks fail
+ when replace=False
"""
- # get a random free port
- while True:
- port = random.randint(1024, 65535)
- try:
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.connect(('localhost', port))
- s.close()
- except:
- break
-
- class PortBlocker(threading.Thread):
-
- def run(self):
- conns = 0
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.bind(('localhost', port))
- s.setblocking(1)
- s.listen(1)
- while conns < 2: # blocks until rece
- conns += 1
- s.accept()
- s.close()
-
- # block the port
- taker = PortBlocker()
- taker.start()
- time.sleep(1) # wait for thread to start.
+ event = catalog.CLIENT_UID
+ self._client.register(event, lambda event, _: None, uid=1)
self.assertRaises(
- server.PortAlreadyTaken, server.ensure_server, port)
+ CallbackAlreadyRegisteredError,
+ self._client.register,
+ event, lambda event, _: None, uid=1, replace=False)
- def test_async_register(self):
+ def test_register_more_than_one_callback_works(self):
"""
- Test asynchronous registering of callbacks.
+ Make sure clients can replace already registered callbacks.
"""
- flag = Mock()
+ event = catalog.CLIENT_UID
+ d1 = defer.Deferred()
+
+ def cbk1(event, _):
+ return callFromThread(d1.callback, event)
- # executed after async register, when response is received from server
- def reqcbk(request, response):
- flag(request.event, response.status)
+ d2 = defer.Deferred()
- # callback registered by application
- def callback(request):
- pass
+ def cbk2(event, _):
+ return d2.callback(event)
- # passing a callback as reqcbk param makes the call asynchronous
- result = events.register(CLIENT_UID, callback, reqcbk=reqcbk)
- self.assertIsNone(result)
- events.signal(CLIENT_UID)
- time.sleep(1) # wait for signal to arrive from server
- flag.assert_called_once_with(CLIENT_UID, EventResponse.OK)
+ self._client.register(event, cbk1)
+ self._client.register(event, cbk2)
+ self._client.emit(event, None)
+ d = defer.gatherResults([d1, d2])
+ return d
- def test_async_signal(self):
+ def test_client_receives_signal(self):
"""
- Test asynchronous registering of callbacks.
+ Ensure clients can receive signals.
"""
- flag = Mock()
+ event = catalog.CLIENT_UID
+ d = defer.Deferred()
- # executed after async signal, when response is received from server
- def reqcbk(request, response):
- flag(request.event, response.status)
+ def cbk(events, _):
+ callFromThread(d.callback, event)
- # passing a callback as reqcbk param makes the call asynchronous
- result = events.signal(CLIENT_UID, reqcbk=reqcbk)
- self.assertIsNone(result)
- time.sleep(1) # wait for signal to arrive from server
- flag.assert_called_once_with(CLIENT_UID, EventResponse.OK)
+ self._client.register(event, cbk)
+ self._client.emit(event, None)
+ return d
- def test_async_unregister(self):
+ def test_client_unregister_all(self):
"""
- Test asynchronous unregistering of callbacks.
+ Test that the client can unregister all events for one signal.
"""
- flag = Mock()
+ event1 = catalog.CLIENT_UID
+ d = defer.Deferred()
+ # register more than one callback for the same event
+ self._client.register(
+ event1, lambda ev, _: callFromThread(d.errback, None))
+ self._client.register(
+ event1, lambda ev, _: callFromThread(d.errback, None))
+ # unregister and emit the event
+ self._client.unregister(event1)
+ self._client.emit(event1, None)
+ # register and emit another event so the deferred can succeed
+ event2 = catalog.CLIENT_SESSION_ID
+ self._client.register(
+ event2, lambda ev, _: callFromThread(d.callback, None))
+ self._client.emit(event2, None)
+ return d
- # executed after async signal, when response is received from server
- def reqcbk(request, response):
- flag(request.event, response.status)
-
- # callback registered by application
- def callback(request):
- pass
-
- # passing a callback as reqcbk param makes the call asynchronous
- events.register(CLIENT_UID, callback)
- result = events.unregister(CLIENT_UID, reqcbk=reqcbk)
- self.assertIsNone(result)
- time.sleep(1) # wait for signal to arrive from server
- flag.assert_called_once_with(CLIENT_UID, EventResponse.OK)
-
- def test_async_ping_server(self):
+ def test_client_unregister_by_uid(self):
"""
- Test asynchronous pinging of server.
+ Test that the client can unregister an event by uid.
"""
- flag = Mock()
+ event = catalog.CLIENT_UID
+ d = defer.Deferred()
+ # register one callback that would fail
+ uid = self._client.register(
+ event, lambda ev, _: callFromThread(d.errback, None))
+ # register one callback that will succeed
+ self._client.register(
+ event, lambda ev, _: callFromThread(d.callback, None))
+ # unregister by uid and emit the event
+ self._client.unregister(event, uid=uid)
+ self._client.emit(event, None)
+ return d
- # executed after async signal, when response is received from server
- def reqcbk(request, response):
- flag(response.status)
- result = events.ping_server(reqcbk=reqcbk)
- self.assertIsNone(result)
- time.sleep(1) # wait for response to arrive from server.
- flag.assert_called_once_with(EventResponse.OK)
+class EventsTxClientTestCase(EventsGenericClientTestCase, unittest.TestCase):
+
+ _client = txclient
- def test_async_ping_client(self):
- """
- Test asynchronous pinging of client.
- """
- flag = Mock()
- # executed after async signal, when response is received from server
- def reqcbk(request, response):
- flag(response.status)
+class EventsClientTestCase(EventsGenericClientTestCase, unittest.TestCase):
- daemon = client.ensure_client_daemon()
- result = events.ping_client(daemon.get_port(), reqcbk=reqcbk)
- self.assertIsNone(result)
- time.sleep(1) # wait for response to arrive from server.
- flag.assert_called_once_with(EventResponse.OK)
+ _client = client
diff --git a/src/leap/common/tests/test_http.py b/src/leap/common/tests/test_http.py
new file mode 100644
index 0000000..f44550f
--- /dev/null
+++ b/src/leap/common/tests/test_http.py
@@ -0,0 +1,75 @@
+# -*- coding: utf-8 -*-
+# test_http.py
+# Copyright (C) 2013 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Tests for:
+ * leap/common/http.py
+"""
+import os
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
+
+from leap.common import http
+from leap.common.testing.basetest import BaseLeapTest
+
+TEST_CERT_PEM = os.path.join(
+ os.path.split(__file__)[0],
+ '..', 'testing', "leaptest_combined_keycert.pem")
+
+
+class HTTPClientTest(BaseLeapTest):
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_agents_sharing_pool_by_default(self):
+ client = http.HTTPClient()
+ client2 = http.HTTPClient(TEST_CERT_PEM)
+ self.assertNotEquals(
+ client._agent, client2._agent, "Expected dedicated agents")
+ self.assertEquals(
+ client._agent._pool, client2._agent._pool,
+ "Pool was not reused by default")
+
+ def test_agent_can_have_dedicated_custom_pool(self):
+ custom_pool = http._HTTPConnectionPool(
+ None,
+ timeout=10,
+ maxPersistentPerHost=42,
+ persistent=False
+ )
+ self.assertEquals(custom_pool.maxPersistentPerHost, 42,
+ "Custom persistent connections "
+ "limit is not being respected")
+ self.assertFalse(custom_pool.persistent,
+ "Custom persistence is not being respected")
+ default_client = http.HTTPClient()
+ custom_client = http.HTTPClient(pool=custom_pool)
+
+ self.assertNotEquals(
+ default_client._agent, custom_client._agent,
+ "No agent reuse is expected")
+ self.assertEquals(
+ custom_pool, custom_client._agent._pool,
+ "Custom pool usage was not respected")
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/src/leap/common/zmq_utils.py b/src/leap/common/zmq_utils.py
new file mode 100644
index 0000000..0a781de
--- /dev/null
+++ b/src/leap/common/zmq_utils.py
@@ -0,0 +1,103 @@
+# -*- coding: utf-8 -*-
+# zmq.py
+# Copyright (C) 2013, 2014 LEAP
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Utilities to handle ZMQ certificates.
+"""
+import os
+import logging
+import stat
+import shutil
+
+import zmq
+
+try:
+ import zmq.auth
+except ImportError:
+ pass
+
+from leap.common.files import mkdir_p
+from leap.common.check import leap_assert
+
+logger = logging.getLogger(__name__)
+
+
+KEYS_PREFIX = "zmq_certificates"
+PUBLIC_KEYS_PREFIX = os.path.join(KEYS_PREFIX, "public_keys")
+PRIVATE_KEYS_PREFIX = os.path.join(KEYS_PREFIX, "private_keys")
+
+
+def zmq_has_curve():
+ """
+ Return whether the current ZMQ has support for auth and CurveZMQ security.
+
+ :rtype: bool
+
+ Version notes:
+ `zmq.curve_keypair()` is new in version 14.0, new in version libzmq-4.0.
+ Requires libzmq (>= 4.0) to have been linked with libsodium.
+ `zmq.auth` module is new in version 14.1
+ `zmq.has()` is new in version 14.1, new in version libzmq-4.1.
+ """
+ zmq_version = zmq.zmq_version_info()
+ pyzmq_version = zmq.pyzmq_version_info()
+
+ if pyzmq_version >= (14, 1, 0) and zmq_version >= (4, 1):
+ return zmq.has('curve')
+
+ if pyzmq_version < (14, 1, 0):
+ return False
+
+ if zmq_version < (4, 0):
+ # security is new in libzmq 4.0
+ return False
+
+ try:
+ zmq.curve_keypair()
+ except zmq.error.ZMQError:
+ # security requires libzmq to be linked against libsodium
+ return False
+
+ return True
+
+
+def assert_zmq_has_curve():
+ leap_assert(zmq_has_curve, "CurveZMQ not supported!")
+
+
+def maybe_create_and_get_certificates(basedir, name):
+ """
+ Generate the needed ZMQ certificates for backend/frontend communication if
+ needed.
+ """
+ assert_zmq_has_curve()
+ private_keys_dir = os.path.join(basedir, PRIVATE_KEYS_PREFIX)
+ private_key = os.path.join(
+ private_keys_dir, name + ".key_secret")
+ if not os.path.isfile(private_key):
+ mkdir_p(private_keys_dir)
+ zmq.auth.create_certificates(private_keys_dir, name)
+ # set permissions to: 0700 (U:rwx G:--- O:---)
+ os.chmod(private_key, stat.S_IRUSR | stat.S_IWUSR)
+ # move public key to public keys directory
+ public_keys_dir = os.path.join(basedir, PUBLIC_KEYS_PREFIX)
+ old_public_key = os.path.join(
+ private_keys_dir, name + ".key")
+ new_public_key = os.path.join(
+ public_keys_dir, name + ".key")
+ mkdir_p(public_keys_dir)
+ shutil.move(old_public_key, new_public_key)
+ return zmq.auth.load_certificate(private_key)