diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/leap/common/__init__.py | 3 | ||||
-rw-r--r-- | src/leap/common/config/__init__.py | 0 | ||||
-rw-r--r-- | src/leap/common/config/baseconfig.py | 186 | ||||
-rw-r--r-- | src/leap/common/config/pluggableconfig.py | 475 | ||||
-rw-r--r-- | src/leap/common/config/prefixers.py | 132 | ||||
-rw-r--r-- | src/leap/common/events/Makefile | 31 | ||||
-rw-r--r-- | src/leap/common/events/README | 45 | ||||
-rw-r--r-- | src/leap/common/events/__init__.py | 100 | ||||
-rw-r--r-- | src/leap/common/events/component.py | 238 | ||||
-rw-r--r-- | src/leap/common/events/daemon.py | 208 | ||||
-rw-r--r-- | src/leap/common/events/events.proto | 69 | ||||
-rw-r--r-- | src/leap/common/events/events_pb2.py | 364 | ||||
-rw-r--r-- | src/leap/common/events/mac_auth.py | 31 | ||||
-rw-r--r-- | src/leap/common/events/server.py | 149 | ||||
-rw-r--r-- | src/leap/common/tests/__init__.py | 0 | ||||
-rw-r--r-- | src/leap/common/tests/test_events.py | 200 |
16 files changed, 2230 insertions, 1 deletions
diff --git a/src/leap/common/__init__.py b/src/leap/common/__init__.py index 9467c46..5702ca1 100644 --- a/src/leap/common/__init__.py +++ b/src/leap/common/__init__.py @@ -3,6 +3,7 @@ import logging from leap.common import certs from leap.common import check from leap.common import files +from leap.common import events logger = logging.getLogger(__name__) @@ -13,6 +14,6 @@ except ImportError: logger.debug('PyGeoIP not found. Disabled Geo support.') HAS_GEOIP = False -__all__ = ["certs", "check", "files"] +__all__ = ["certs", "check", "files", "events"] __version__ = "0.2.0-dev" diff --git a/src/leap/common/config/__init__.py b/src/leap/common/config/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/leap/common/config/__init__.py diff --git a/src/leap/common/config/baseconfig.py b/src/leap/common/config/baseconfig.py new file mode 100644 index 0000000..edb9b24 --- /dev/null +++ b/src/leap/common/config/baseconfig.py @@ -0,0 +1,186 @@ +# -*- coding: utf-8 -*- +# baseconfig.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +Implements the abstract base class for configuration +""" + +import copy +import logging +import functools +import os + +from abc import ABCMeta, abstractmethod + +from leap.common.check import leap_assert +from leap.common.files import mkdir_p +from leap.common.config.pluggableconfig import PluggableConfig +from leap.common.config.prefixers import get_platform_prefixer + +logger = logging.getLogger(__name__) + + +class BaseConfig: + """ + Abstract base class for any JSON based configuration + """ + + __metaclass__ = ABCMeta + + """ + Standalone is a class wide parameter + + @param standalone: if True it will return the prefix for a + standalone application. Otherwise, it will return the system + default for configuration storage. + @type standalone: bool + """ + standalone = False + + def __init__(self): + self._data = {} + self._config_checker = None + + @abstractmethod + def _get_spec(self): + """ + Returns the spec object for the specific configuration + """ + return None + + def _safe_get_value(self, key): + """ + Tries to return a value only if the config has already been loaded + + @rtype: depends on the config structure, dict, str, array, int + @return: returns the value for the specified key in the config + """ + leap_assert(self._config_checker, "Load the config first") + return self._config_checker.config[key] + + def get_path_prefix(self): + """ + Returns the platform dependant path prefixer + + """ + return get_platform_prefixer().get_path_prefix( + standalone=self.standalone) + + def loaded(self): + """ + Returns True if the configuration has been already + loaded. False otherwise + """ + return self._config_checker is not None + + def save(self, path_list): + """ + Saves the current configuration to disk + + @param path_list: list of components that form the relative + path to configuration. The absolute path will be calculated + depending on the platform. + @type path_list: list + + @return: True if saved to disk correctly, False otherwise + """ + config_path = os.path.join(self.get_path_prefix(), *(path_list[:-1])) + mkdir_p(config_path) + + try: + self._config_checker.serialize(os.path.join(config_path, + path_list[-1])) + except Exception as e: + logger.warning("%s" % (e,)) + raise + return True + + def load(self, path="", data=None, mtime=None): + """ + Loads the configuration from disk + + @type path: str + @param path: relative path to configuration. The absolute path + will be calculated depending on the platform + + @return: True if loaded from disk correctly, False otherwise + """ + + config_path = os.path.join(self.get_path_prefix(), + path) + + self._config_checker = PluggableConfig(format="json") + self._config_checker.options = copy.deepcopy(self._get_spec()) + + try: + if data is None: + self._config_checker.load(fromfile=config_path, mtime=mtime) + else: + self._config_checker.load(data, mtime=mtime) + except Exception as e: + logger.warning("Something went wrong while loading " + + "the config from %s\n%s" % (config_path, e)) + self._config_checker = None + return False + return True + + +class LocalizedKey(object): + """ + Decorator used for keys that are localized in a configuration + """ + + def __init__(self, func, **kwargs): + self._func = func + + def __call__(self, instance, lang="en"): + """ + Tries to return the string for the specified language, otherwise + informs the problem and returns an empty string + + @param lang: language code + @type lang: str + + @return: localized value from the possible values returned by + self._func + """ + descriptions = self._func(instance) + description_lang = "" + config_lang = "en" + for key in descriptions.keys(): + if lang.startswith(key): + config_lang = key + break + + description_lang = descriptions[config_lang] + return description_lang + + def __get__(self, instance, instancetype): + """ + Implement the descriptor protocol to make decorating instance + method possible. + """ + # Return a partial function with the first argument is the instance + # of the class decorated. + return functools.partial(self.__call__, instance) + +if __name__ == "__main__": + try: + config = BaseConfig() # should throw TypeError for _get_spec + except Exception as e: + assert isinstance(e, TypeError), "Something went wrong" + print "Abstract BaseConfig class is working as expected" diff --git a/src/leap/common/config/pluggableconfig.py b/src/leap/common/config/pluggableconfig.py new file mode 100644 index 0000000..8535fa6 --- /dev/null +++ b/src/leap/common/config/pluggableconfig.py @@ -0,0 +1,475 @@ +# -*- coding: utf-8 -*- +# pluggableconfig.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +generic configuration handlers +""" +import copy +import json +import logging +import os +import time +import urlparse + +import jsonschema + +#from leap.base.util.translations import LEAPTranslatable +from leap.common.check import leap_assert + + +logger = logging.getLogger(__name__) + + +__all__ = ['PluggableConfig', + 'adaptors', + 'types', + 'UnknownOptionException', + 'MissingValueException', + 'ConfigurationProviderException', + 'TypeCastException'] + +# exceptions + + +class UnknownOptionException(Exception): + """exception raised when a non-configuration + value is present in the configuration""" + + +class MissingValueException(Exception): + """exception raised when a required value is missing""" + + +class ConfigurationProviderException(Exception): + """exception raised when a configuration provider is missing, etc""" + + +class TypeCastException(Exception): + """exception raised when a + configuration item cannot be coerced to a type""" + + +class ConfigAdaptor(object): + """ + abstract base class for config adaotors for + serialization/deserialization and custom validation + and type casting. + """ + def read(self, filename): + raise NotImplementedError("abstract base class") + + def write(self, config, filename): + with open(filename, 'w') as f: + self._write(f, config) + + def _write(self, fp, config): + raise NotImplementedError("abstract base class") + + def validate(self, config, schema): + raise NotImplementedError("abstract base class") + + +adaptors = {} + + +class JSONSchemaEncoder(json.JSONEncoder): + """ + custom default encoder that + casts python objects to json objects for + the schema validation + """ + def default(self, obj): + if obj is str: + return 'string' + if obj is unicode: + return 'string' + if obj is int: + return 'integer' + if obj is list: + return 'array' + if obj is dict: + return 'object' + if obj is bool: + return 'boolean' + + +class JSONAdaptor(ConfigAdaptor): + indent = 2 + extensions = ['json'] + + def read(self, _from): + if isinstance(_from, file): + _from_string = _from.read() + if isinstance(_from, str): + _from_string = _from + return json.loads(_from_string) + + def _write(self, fp, config): + fp.write(json.dumps(config, + indent=self.indent, + sort_keys=True)) + + def validate(self, config, schema_obj): + schema_json = JSONSchemaEncoder().encode(schema_obj) + schema = json.loads(schema_json) + jsonschema.validate(config, schema) + + +adaptors['json'] = JSONAdaptor() + +# +# Adaptors +# +# Allow to apply a predefined set of types to the +# specs, so it checks the validity of formats and cast it +# to proper python types. + +# TODO: +# - HTTPS uri + + +class DateType(object): + fmt = '%Y-%m-%d' + + def to_python(self, data): + return time.strptime(data, self.fmt) + + def get_prep_value(self, data): + return time.strftime(self.fmt, data) + + +class TranslatableType(object): + """ + a type that casts to LEAPTranslatable objects. + Used for labels we get from providers and stuff. + """ + + def to_python(self, data): + # TODO: add translatable + return data # LEAPTranslatable(data) + + # needed? we already have an extended dict... + #def get_prep_value(self, data): + #return dict(data) + + +class URIType(object): + + def to_python(self, data): + parsed = urlparse.urlparse(data) + if not parsed.scheme: + raise TypeCastException("uri %s has no schema" % data) + return parsed.geturl() + + def get_prep_value(self, data): + return data + + +class HTTPSURIType(object): + + def to_python(self, data): + parsed = urlparse.urlparse(data) + if not parsed.scheme: + raise TypeCastException("uri %s has no schema" % data) + if parsed.scheme != "https": + raise TypeCastException( + "uri %s does not has " + "https schema" % data) + return parsed.geturl() + + def get_prep_value(self, data): + return data + + +types = { + 'date': DateType(), + 'uri': URIType(), + 'https-uri': HTTPSURIType(), + 'translatable': TranslatableType(), +} + + +class PluggableConfig(object): + + options = {} + + def __init__(self, + adaptors=adaptors, + types=types, + format=None): + + self.config = {} + self.adaptors = adaptors + self.types = types + self._format = format + self.mtime = None + self.dirty = False + + @property + def option_dict(self): + if hasattr(self, 'options') and isinstance(self.options, dict): + return self.options.get('properties', None) + + def items(self): + """ + act like an iterator + """ + if isinstance(self.option_dict, dict): + return self.option_dict.items() + return self.options + + def validate(self, config, format=None): + """ + validate config + """ + schema = self.options + if format is None: + format = self._format + + if format: + adaptor = self.get_adaptor(self._format) + adaptor.validate(config, schema) + else: + # we really should make format mandatory... + logger.error('no format passed to validate') + + # first round of validation is ok. + # now we proceed to cast types if any specified. + self.to_python(config) + + def to_python(self, config): + """ + cast types following first type and then format indications. + """ + unseen_options = [i for i in config if i not in self.option_dict] + if unseen_options: + raise UnknownOptionException( + "Unknown options: %s" % ', '.join(unseen_options)) + + for key, value in config.items(): + _type = self.option_dict[key].get('type') + if _type is None and 'default' in self.option_dict[key]: + _type = type(self.option_dict[key]['default']) + if _type is not None: + tocast = True + if not callable(_type) and isinstance(value, _type): + tocast = False + if tocast: + try: + config[key] = _type(value) + except BaseException, e: + raise TypeCastException( + "Could not coerce %s, %s, " + "to type %s: %s" % (key, value, _type.__name__, e)) + _format = self.option_dict[key].get('format', None) + _ftype = self.types.get(_format, None) + if _ftype: + try: + config[key] = _ftype.to_python(value) + except BaseException, e: + raise TypeCastException( + "Could not coerce %s, %s, " + "to format %s: %s" % (key, value, + _ftype.__class__.__name__, + e)) + + return config + + def prep_value(self, config): + """ + the inverse of to_python method, + called just before serialization + """ + for key, value in config.items(): + _format = self.option_dict[key].get('format', None) + _ftype = self.types.get(_format, None) + if _ftype and hasattr(_ftype, 'get_prep_value'): + try: + config[key] = _ftype.get_prep_value(value) + except BaseException, e: + raise TypeCastException( + "Could not serialize %s, %s, " + "by format %s: %s" % (key, value, + _ftype.__class__.__name__, + e)) + else: + config[key] = value + return config + + # methods for adding configuration + + def get_default_values(self): + """ + return a config options from configuration defaults + """ + defaults = {} + for key, value in self.items(): + if 'default' in value: + defaults[key] = value['default'] + return copy.deepcopy(defaults) + + def get_adaptor(self, format): + """ + get specified format adaptor or + guess for a given filename + """ + adaptor = self.adaptors.get(format, None) + if adaptor: + return adaptor + + # not registered in adaptors dict, let's try all + for adaptor in self.adaptors.values(): + if format in adaptor.extensions: + return adaptor + + def filename2format(self, filename): + extension = os.path.splitext(filename)[-1] + return extension.lstrip('.') or None + + def serialize(self, filename, format=None, full=False): + if not format: + format = self._format + if not format: + format = self.filename2format(filename) + if not format: + raise Exception('Please specify a format') + # TODO: more specific exception type + + adaptor = self.get_adaptor(format) + if not adaptor: + raise Exception("Adaptor not found for format: %s" % format) + + config = copy.deepcopy(self.config) + serializable = self.prep_value(config) + adaptor.write(serializable, filename) + + if self.mtime: + self.touch_mtime(filename) + + def touch_mtime(self, filename): + mtime = self.mtime + os.utime(filename, (mtime, mtime)) + + def deserialize(self, string=None, fromfile=None, format=None): + """ + load configuration from a file or string + """ + + def _try_deserialize(): + if fromfile: + with open(fromfile, 'r') as f: + content = adaptor.read(f) + elif string: + content = adaptor.read(string) + return content + + # XXX cleanup this! + + if fromfile: + leap_assert(os.path.exists(fromfile)) + if not format: + format = self.filename2format(fromfile) + + if not format: + format = self._format + if format: + adaptor = self.get_adaptor(format) + else: + adaptor = None + + if adaptor: + content = _try_deserialize() + return content + + # no adaptor, let's try rest of adaptors + + adaptors = self.adaptors[:] + + if format: + adaptors.sort( + key=lambda x: int( + format in x.extensions), + reverse=True) + + for adaptor in adaptors: + content = _try_deserialize() + return content + + def set_dirty(self): + self.dirty = True + + def is_dirty(self): + return self.dirty + + def load(self, *args, **kwargs): + """ + load from string or file + if no string of fromfile option is given, + it will attempt to load from defaults + defined in the schema. + """ + string = args[0] if args else None + fromfile = kwargs.get("fromfile", None) + mtime = kwargs.pop("mtime", None) + self.mtime = mtime + content = None + + # start with defaults, so we can + # have partial values applied. + content = self.get_default_values() + if string and isinstance(string, str): + content = self.deserialize(string) + + if not string and fromfile is not None: + #import ipdb;ipdb.set_trace() + content = self.deserialize(fromfile=fromfile) + + if not content: + logger.error('no content could be loaded') + # XXX raise! + return + + # lazy evaluation until first level of nesting + # to allow lambdas with context-dependant info + # like os.path.expanduser + for k, v in content.iteritems(): + if callable(v): + content[k] = v() + + self.validate(content) + self.config = content + return True + + +def testmain(): # pragma: no cover + + from tests import test_validation as t + import pprint + + config = PluggableConfig(_format="json") + properties = copy.deepcopy(t.sample_spec) + + config.options = properties + config.load(fromfile='data.json') + + print 'config' + pprint.pprint(config.config) + + config.serialize('/tmp/testserial.json') + +if __name__ == "__main__": + testmain() diff --git a/src/leap/common/config/prefixers.py b/src/leap/common/config/prefixers.py new file mode 100644 index 0000000..27274bd --- /dev/null +++ b/src/leap/common/config/prefixers.py @@ -0,0 +1,132 @@ +# -*- coding: utf-8 -*- +# prefixers.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +Platform dependant configuration path prefixers +""" +import os +import platform + +from abc import ABCMeta, abstractmethod +from xdg import BaseDirectory + +from leap.common.check import leap_assert + + +class Prefixer: + """ + Abstract prefixer class + """ + + __metaclass__ = ABCMeta + + @abstractmethod + def get_path_prefix(self, standalone=False): + """ + Returns the platform dependant path prefixer + + @param standalone: if True it will return the prefix for a + standalone application. Otherwise, it will return the system + default for configuration storage. + @type standalone: bool + """ + return "" + + +def get_platform_prefixer(): + prefixer = globals()[platform.system() + "Prefixer"] + leap_assert(prefixer, "Unimplemented platform prefixer: %s" % + (platform.system(),)) + return prefixer() + + +class LinuxPrefixer(Prefixer): + """ + Config prefixer for the Linux platform + """ + + def get_path_prefix(self, standalone=False): + """ + Returns the platform dependant path prefixer. + This method expects an env variable named LEAP_CLIENT_PATH if + standalone is used. + + @param standalone: if True it will return the prefix for a + standalone application. Otherwise, it will return the system + default for configuration storage. + @type standalone: bool + """ + config_dir = BaseDirectory.xdg_config_home + if not standalone: + return config_dir + return os.path.join(os.getcwd(), "config") + + +class DarwinPrefixer(Prefixer): + """ + Config prefixer for the Darwin platform + """ + + def get_path_prefix(self, standalone=False): + """ + Returns the platform dependant path prefixer. + This method expects an env variable named LEAP_CLIENT_PATH if + standalone is used. + + @param standalone: if True it will return the prefix for a + standalone application. Otherwise, it will return the system + default for configuration storage. + @type standalone: bool + """ + config_dir = BaseDirectory.xdg_config_home + if not standalone: + return config_dir + return os.getenv(os.getcwd(), "config") + + +class WindowsPrefixer(Prefixer): + """ + Config prefixer for the Windows platform + """ + + def get_path_prefix(self, standalone=False): + """ + Returns the platform dependant path prefixer. + This method expects an env variable named LEAP_CLIENT_PATH if + standalone is used. + + @param standalone: if True it will return the prefix for a + standalone application. Otherwise, it will return the system + default for configuration storage. + @type standalone: bool + """ + config_dir = BaseDirectory.xdg_config_home + + if not standalone: + return config_dir + return os.path.join(os.getcwd(), "config") + +if __name__ == "__main__": + try: + abs_prefixer = Prefixer() + except Exception as e: + assert isinstance(e, TypeError), "Something went wrong" + print "Abstract Prefixer class is working as expected" + + linux_prefixer = LinuxPrefixer() + print linux_prefixer.get_path_prefix(standalone=True) + print linux_prefixer.get_path_prefix() diff --git a/src/leap/common/events/Makefile b/src/leap/common/events/Makefile new file mode 100644 index 0000000..4f73dea --- /dev/null +++ b/src/leap/common/events/Makefile @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +# Makefile +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +# This file is used to generate protobuf python files that are used for IPC: +# +# https://developers.google.com/protocol-buffers/docs/pythontutorial + +PROTOC = protoc + +all: events_pb2.py + +%_pb2.py: %.proto + $(PROTOC) --python_out=./ $< + autopep8 --in-place --aggressive $@ + +clean: + rm -f *_pb2.py diff --git a/src/leap/common/events/README b/src/leap/common/events/README new file mode 100644 index 0000000..61b320d --- /dev/null +++ b/src/leap/common/events/README @@ -0,0 +1,45 @@ +Events mechanism +================ + +The events mechanism allows for "components" to send signal events to each +other by means of a centralized server. Components can register with the +server to receive signals of certain types, and they can also send signals to +the server that will then redistribute these signals to registered components. + + +Listening daemons +----------------- + +Both components and the server listen for incoming messages by using a +listening daemon that runs in its own thread. The server daemon has to be +started explicitly, while components daemon will be started whenever a +component registers with the server to receive messages. + + +How to use it +------------- + +To start the events server: + +>>> from leap.common.events import server +>>> server.ensure_server(port=8090) + +To register a callback to be called when a given signal is raised: + +>>> from leap.common.events import ( +>>> register, +>>> events_pb2 as proto, +>>> ) +>>> +>>> def mycallback(sigreq): +>>> print str(sigreq) +>>> +>>> events.register(signal=proto.CLIENT_UID, callback=mycallback) + +To signal an event: + +>>> from leap.common.events import ( +>>> signal, +>>> events_pb2 as proto, +>>> ) +>>> signal(proto.CLIENT_UID) diff --git a/src/leap/common/events/__init__.py b/src/leap/common/events/__init__.py new file mode 100644 index 0000000..c949080 --- /dev/null +++ b/src/leap/common/events/__init__.py @@ -0,0 +1,100 @@ +# -*- coding: utf-8 -*- +# __init__.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +An events mechanism that allows for signaling of events between components. +""" + +import logging +import socket + + +from leap.common.events import ( + events_pb2, + server, + component, + daemon, +) + + +logger = logging.getLogger(__name__) + + +def register(signal, callback, uid=None, replace=False, reqcbk=None, + timeout=1000): + """ + Register a callback to be called when the given signal is received. + + Will timeout after timeout ms if response has not been received. The + timeout arg is only used for asynch requests. If a reqcbk callback has + been supplied the timeout arg is not used. The response value will be + returned for a synch request but nothing will be returned for an asynch + request. + + @param signal: the signal that causes the callback to be launched + @type signal: int (see the `events.proto` file) + @param callback: the callback to be called when the signal is received + @type callback: function + @param uid: a unique id for the callback + @type uid: int + @param replace: should an existent callback with same uid be replaced? + @type replace: bool + @param reqcbk: a callback to be called when a response from server is + received + @type reqcbk: function + callback(leap.common.events.events_pb2.EventResponse) + @param timeout: the timeout for synch calls + @type timeout: int + + @return: the response from server for synch calls or nothing for asynch + calls + @rtype: leap.common.events.events_pb2.EventsResponse or None + """ + return component.register(signal, callback, uid, replace, reqcbk, timeout) + + +def signal(signal, content="", mac_method="", mac="", reqcbk=None, + timeout=1000): + """ + Send `signal` event to events server. + + Will timeout after timeout ms if response has not been received. The + timeout arg is only used for asynch requests. If a reqcbk callback has + been supplied the timeout arg is not used. The response value will be + returned for a synch request but nothing will be returned for an asynch + request. + + @param signal: the signal that causes the callback to be launched + @type signal: int (see the `events.proto` file) + @param content: the contents of the event signal + @type content: str + @param mac_method: the method used to auth mac + @type mac_method: str + @param mac: the content of the auth mac + @type mac: str + @param reqcbk: a callback to be called when a response from server is + received + @type reqcbk: function + callback(leap.common.events.events_pb2.EventResponse) + @param timeout: the timeout for synch calls + @type timeout: int + + @return: the response from server for synch calls or nothing for asynch + calls + @rtype: leap.common.events.events_pb2.EventsResponse or None + """ + return component.signal(signal, content, mac_method, mac, reqcbk, timeout) diff --git a/src/leap/common/events/component.py b/src/leap/common/events/component.py new file mode 100644 index 0000000..0cf0e38 --- /dev/null +++ b/src/leap/common/events/component.py @@ -0,0 +1,238 @@ +# -*- coding: utf-8 -*- +# component.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +The component end point of the events mechanism. + +Components are the communicating parties of the events mechanism. They +communicate by sending messages to a server, which in turn redistributes +messages to other components. + +When a component registers a callback for a given signal, it also tells the +server that it wants to be notified whenever signals of that type are sent by +some other component. +""" + + +import logging +import threading + + +from protobuf.socketrpc import RpcService +from leap.common.events import ( + events_pb2 as proto, + server, + daemon, + mac_auth, +) + + +logger = logging.getLogger(__name__) + + +# the `registered_callbacks` dictionary below should have the following +# format: +# +# { event_signal: [ (uid, callback), ... ], ... } +# +registered_callbacks = {} + + +class CallbackAlreadyRegistered(Exception): + """ + Raised when trying to register an already registered callback. + """ + + +def ensure_component_daemon(): + """ + Ensure the component daemon is running and listening for incoming + messages. + + @return: the daemon instance + @rtype: EventsComponentDaemon + """ + import time + daemon = EventsComponentDaemon.ensure(0) + logger.debug('ensure component daemon') + + # Because we use a random port we want to wait until a port is assigned to + # local component daemon. + + while not (EventsComponentDaemon.get_instance() and + EventsComponentDaemon.get_instance().get_port()): + time.sleep(0.1) + return daemon + + +def register(signal, callback, uid=None, replace=False, reqcbk=None, + timeout=1000): + """ + Registers a callback to be called when a specific signal event is + received. + + Will timeout after timeout ms if response has not been received. The + timeout arg is only used for asynch requests. If a reqcbk callback has + been supplied the timeout arg is not used. The response value will be + returned for a synch request but nothing will be returned for an asynch + request. + + @param signal: the signal that causes the callback to be launched + @type signal: int (see the `events.proto` file) + @param callback: the callback to be called when the signal is received + @type callback: function + callback(leap.common.events.events_pb2.SignalRequest) + @param uid: a unique id for the callback + @type uid: int + @param replace: should an existent callback with same uid be replaced? + @type replace: bool + @param reqcbk: a callback to be called when a response from server is + received + @type reqcbk: function + callback(leap.common.events.events_pb2.EventResponse) + @param timeout: the timeout for synch calls + @type timeout: int + + Might raise a CallbackAlreadyRegistered exception if there's already a + callback identified by the given uid and replace is False. + + @return: the response from server for synch calls or nothing for asynch + calls + @rtype: leap.common.events.events_pb2.EventsResponse or None + """ + ensure_component_daemon() # so we can receive registered signals + # register callback locally + if signal not in registered_callbacks: + registered_callbacks[signal] = [] + cbklist = registered_callbacks[signal] + if uid and filter(lambda (x, y): x == uid, cbklist): + if not replace: + raise CallbackAlreadyRegisteredException() + else: + registered_callbacks[signal] = filter(lambda(x, y): x != uid, + cbklist) + registered_callbacks[signal].append((uid, callback)) + # register callback on server + request = proto.RegisterRequest() + request.event = signal + request.port = EventsComponentDaemon.get_instance().get_port() + request.mac_method = mac_auth.MacMethod.MAC_NONE + request.mac = "" + service = RpcService(proto.EventsServerService_Stub, + server.SERVER_PORT, 'localhost') + logger.info("Sending registration request to server on port %s: %s", + server.SERVER_PORT, + str(request)) + return service.register(request, callback=reqcbk, timeout=timeout) + + +def signal(signal, content="", mac_method="", mac="", reqcbk=None, + timeout=1000): + """ + Send `signal` event to events server. + + Will timeout after timeout ms if response has not been received. The + timeout arg is only used for asynch requests. If a reqcbk callback has + been supplied the timeout arg is not used. The response value will be + returned for a synch request but nothing will be returned for an asynch + request. + + @param signal: the signal that causes the callback to be launched + @type signal: int (see the `events.proto` file) + @param content: the contents of the event signal + @type content: str + @param mac_method: the method used for auth mac + @type mac_method: str + @param mac: the content of the auth mac + @type mac: str + @param reqcbk: a callback to be called when a response from server is + received + @type reqcbk: function + callback(leap.common.events.events_pb2.EventResponse) + @param timeout: the timeout for synch calls + @type timeout: int + + @return: the response from server for synch calls or nothing for asynch + calls + @rtype: leap.common.events.events_pb2.EventsResponse or None + """ + request = proto.SignalRequest() + request.event = signal + request.content = content + request.mac_method = mac_method + request.mac = mac + service = RpcService(proto.EventsServerService_Stub, server.SERVER_PORT, + 'localhost') + logger.info("Sending signal to server: %s", str(request)) + return service.signal(request, callback=reqcbk, timeout=timeout) + + +class EventsComponentService(proto.EventsComponentService): + """ + Service for receiving signal events in components. + """ + + def __init__(self): + proto.EventsComponentService.__init__(self) + + def signal(self, controller, request, done): + """ + Receive a signal and run callbacks registered for that signal. + + This method is called whenever a signal request is received from + server. + + @param controller: used to mediate a single method call + @type controller: protobuf.socketrpc.controller.SocketRpcController + @param request: the request received from the component + @type request: leap.common.events.events_pb2.SignalRequest + @param done: callback to be called when done + @type done: protobuf.socketrpc.server.Callback + """ + logger.info('Received signal from server: %s' % str(request)) + + # run registered callbacks + # TODO: verify authentication using mac in incoming message + if request.event in registered_callbacks: + for (_, cbk) in registered_callbacks[request.event]: + # callbacks should be prepared to receive a + # events_pb2.SignalRequest. + cbk(request) + + # send response back to server + response = proto.EventResponse() + response.status = proto.EventResponse.OK + done.run(response) + + +class EventsComponentDaemon(daemon.EventsSingletonDaemon): + """ + A daemon that listens for incoming events from server. + """ + + @classmethod + def ensure(cls, port): + """ + Make sure the daemon is running on the given port. + + @param port: the port in which the daemon should listen + @type port: int + + @return: a daemon instance + @rtype: EventsComponentDaemon + """ + return cls.ensure_service(port, EventsComponentService()) diff --git a/src/leap/common/events/daemon.py b/src/leap/common/events/daemon.py new file mode 100644 index 0000000..d2c7b9b --- /dev/null +++ b/src/leap/common/events/daemon.py @@ -0,0 +1,208 @@ +# -*- coding: utf-8 -*- +# daemon.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +A singleton daemon for running RPC services using protobuf.socketrpc. +""" + + +import logging +import threading + + +from protobuf.socketrpc.server import ( + SocketRpcServer, + ThreadedTCPServer, + SocketHandler, +) + + +logger = logging.getLogger(__name__) + + +class ServiceAlreadyRunningException(Exception): + """ + Raised whenever a service is already running in this process but someone + attemped to start it in a different port. + """ + + +class EventsRpcServer(SocketRpcServer): + """ + RPC server used in server and component interfaces to receive messages. + """ + + def __init__(self, port, host='localhost'): + """ + Initialize a RPC server. + + @param port: the port in which to listen for incoming messages + @type port: int + @param host: the address to bind to + @type host: str + """ + SocketRpcServer.__init__(self, port, host) + self._server = None + + def run(self): + """ + Run the server. + """ + logger.info('Running server on port %d.' % self.port) + # parent implementation does not hold the server instance, so we do it + # here. + self._server = ThreadedTCPServer((self.host, self.port), + SocketHandler, self) + # if we chose to use a random port, fetch the port number info. + if self.port is 0: + self.port = self._server.socket.getsockname()[1] + self._server.serve_forever() + + def stop(self): + """ + Stop the server. + """ + self._server.shutdown() + + +class EventsSingletonDaemon(threading.Thread): + """ + Singleton class for for launching and terminating a daemon. + + This class is used so every part of the mechanism that needs to listen for + messages can launch its own daemon (thread) to do the job. + """ + + # Singleton instance + __instance = None + + def __new__(cls, *args, **kwargs): + """ + Return a singleton instance if it exists or create and initialize one. + """ + if len(args) is not 2: + raise TypeError("__init__() takes exactly 2 arguments (%d given)" + % len(args)) + if cls.__instance is None: + cls.__instance = object.__new__( + EventsSingletonDaemon) + cls.__initialize(cls.__instance, args[0], args[1]) + return cls.__instance + + @staticmethod + def __initialize(self, port, service): + """ + Initialize a singleton daemon. + + This is a static method disguised as instance method that actually + does the initialization of the daemon instance. + + @param port: the port in which to listen for incoming messages + @type port: int + @param service: the service to provide in this daemon + @type service: google.protobuf.service.Service + """ + threading.Thread.__init__(self) + self._port = port + self._service = service + self._server = EventsRpcServer(self._port) + self._server.registerService(self._service) + self.daemon = True + + def __init__(self): + """ + Singleton placeholder initialization method. + + Initialization is made in __new__ so we can always return the same + instance upon object creation. + """ + pass + + @classmethod + def ensure(cls, port): + """ + Make sure the daemon instance is running. + + Each implementation of this method should call `self.ensure_service` + with the appropriate service from the `events.proto` definitions, and + return the daemon instance. + + @param port: the port in which the daemon should be listening + @type port: int + + @return: a daemon instance + @rtype: EventsSingletonDaemon + """ + raise NotImplementedError(self.ensure) + + @classmethod + def ensure_service(cls, port, service): + """ + Start the singleton instance if not already running. + + Might return ServiceAlreadyRunningException + + @param port: the port in which the daemon should be listening + @type port: int + + @return: a daemon instance + @rtype: EventsSingletonDaemon + """ + daemon = cls(port, service) + if not daemon.is_alive(): + daemon.start() + elif port and port != cls.__instance._port: + # service is running in this process but someone is trying to + # start it in another port + raise ServiceAlreadyRunningException( + "Service is already running in this process on port %d." + % self.__instance._port) + return daemon + + @classmethod + def get_instance(cls): + """ + Retrieve singleton instance of this daemon. + + @return: a daemon instance + @rtype: EventsSingletonDaemon + """ + return cls.__instance + + def run(self): + """ + Run the server. + """ + self._server.run() + + def stop(self): + """ + Stop the daemon. + """ + self._server.stop() + + def get_port(self): + """ + Retrieve the value of the port to which the service running in this + daemon is binded to. + + @return: the port to which the daemon is binded to + @rtype: int + """ + if self._port is 0: + self._port = self._server.port + return self._port diff --git a/src/leap/common/events/events.proto b/src/leap/common/events/events.proto new file mode 100644 index 0000000..29388b8 --- /dev/null +++ b/src/leap/common/events/events.proto @@ -0,0 +1,69 @@ +// signal.proto +// Copyright (C) 2013 LEA +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package leap.common.events; + +enum Event { + CLIENT_SESSION_ID = 1; + CLIENT_UID = 2; + SOLEDAD_CREATING_KEYS = 3; + SOLEDAD_DONE_CREATING_KEYS = 4; + SOLEDAD_UPLOADING_KEYS = 5; + SOLEDAD_DONE_UPLOADING_KEYS = 6; + SOLEDAD_DOWNLOADING_KEYS = 7; + SOLEDAD_DONE_DOWNLOADING_KEYS = 8; + SOLEDAD_NEW_DATA_TO_SYNC = 9; + SOLEDAD_DONE_DATA_SYNC = 10; + UPDATER_NEW_UPDATES = 11; + UPDATER_DONE_UPDATING = 12; +} + +message SignalRequest { + required Event event = 1; + required string content = 2; + required string mac_method = 3; + required bytes mac = 4; + optional string enc_method = 5; + optional bool error_occurred = 6; +} + +message RegisterRequest { + required Event event = 1; + required int32 port = 2; + required string mac_method = 3; + required bytes mac = 4; +} + +message EventResponse { + + enum Status { + OK = 1; + UNAUTH = 2; + ERROR = 3; + } + + required Status status = 1; + optional string result = 2; +} + +service EventsServerService { + rpc register(RegisterRequest) returns (EventResponse); + rpc signal(SignalRequest) returns (EventResponse); +} + +service EventsComponentService { + rpc signal(SignalRequest) returns (EventResponse); +} diff --git a/src/leap/common/events/events_pb2.py b/src/leap/common/events/events_pb2.py new file mode 100644 index 0000000..1d278cc --- /dev/null +++ b/src/leap/common/events/events_pb2.py @@ -0,0 +1,364 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! + +from google.protobuf import descriptor +from google.protobuf import message +from google.protobuf import reflection +from google.protobuf import service +from google.protobuf import service_reflection +from google.protobuf import descriptor_pb2 +# @@protoc_insertion_point(imports) + + +DESCRIPTOR = descriptor.FileDescriptor( + name='events.proto', + package='leap.common.events', + serialized_pb='\n\x0c\x65vents.proto\x12\x12leap.common.events\"\x97\x01\n\rSignalRequest\x12(\n\x05\x65vent\x18\x01 \x02(\x0e\x32\x19.leap.common.events.Event\x12\x0f\n\x07\x63ontent\x18\x02 \x02(\t\x12\x12\n\nmac_method\x18\x03 \x02(\t\x12\x0b\n\x03mac\x18\x04 \x02(\x0c\x12\x12\n\nenc_method\x18\x05 \x01(\t\x12\x16\n\x0e\x65rror_occurred\x18\x06 \x01(\x08\"j\n\x0fRegisterRequest\x12(\n\x05\x65vent\x18\x01 \x02(\x0e\x32\x19.leap.common.events.Event\x12\x0c\n\x04port\x18\x02 \x02(\x05\x12\x12\n\nmac_method\x18\x03 \x02(\t\x12\x0b\n\x03mac\x18\x04 \x02(\x0c\"\x82\x01\n\rEventResponse\x12\x38\n\x06status\x18\x01 \x02(\x0e\x32(.leap.common.events.EventResponse.Status\x12\x0e\n\x06result\x18\x02 \x01(\t\"\'\n\x06Status\x12\x06\n\x02OK\x10\x01\x12\n\n\x06UNAUTH\x10\x02\x12\t\n\x05\x45RROR\x10\x03*\xd5\x02\n\x05\x45vent\x12\x15\n\x11\x43LIENT_SESSION_ID\x10\x01\x12\x0e\n\nCLIENT_UID\x10\x02\x12\x19\n\x15SOLEDAD_CREATING_KEYS\x10\x03\x12\x1e\n\x1aSOLEDAD_DONE_CREATING_KEYS\x10\x04\x12\x1a\n\x16SOLEDAD_UPLOADING_KEYS\x10\x05\x12\x1f\n\x1bSOLEDAD_DONE_UPLOADING_KEYS\x10\x06\x12\x1c\n\x18SOLEDAD_DOWNLOADING_KEYS\x10\x07\x12!\n\x1dSOLEDAD_DONE_DOWNLOADING_KEYS\x10\x08\x12\x1c\n\x18SOLEDAD_NEW_DATA_TO_SYNC\x10\t\x12\x1a\n\x16SOLEDAD_DONE_DATA_SYNC\x10\n\x12\x17\n\x13UPDATER_NEW_UPDATES\x10\x0b\x12\x19\n\x15UPDATER_DONE_UPDATING\x10\x0c\x32\xb9\x01\n\x13\x45ventsServerService\x12R\n\x08register\x12#.leap.common.events.RegisterRequest\x1a!.leap.common.events.EventResponse\x12N\n\x06signal\x12!.leap.common.events.SignalRequest\x1a!.leap.common.events.EventResponse2h\n\x16\x45ventsComponentService\x12N\n\x06signal\x12!.leap.common.events.SignalRequest\x1a!.leap.common.events.EventResponse') + +_EVENT = descriptor.EnumDescriptor( + name='Event', + full_name='leap.common.events.Event', + filename=None, + file=DESCRIPTOR, + values=[ + descriptor.EnumValueDescriptor( + name='CLIENT_SESSION_ID', index=0, number=1, + options=None, + type=None), + descriptor.EnumValueDescriptor( + name='CLIENT_UID', index=1, number=2, + options=None, + type=None), + descriptor.EnumValueDescriptor( + name='SOLEDAD_CREATING_KEYS', index=2, number=3, + options=None, + type=None), + descriptor.EnumValueDescriptor( + name='SOLEDAD_DONE_CREATING_KEYS', index=3, number=4, + options=None, + type=None), + descriptor.EnumValueDescriptor( + name='SOLEDAD_UPLOADING_KEYS', index=4, number=5, + options=None, + type=None), + descriptor.EnumValueDescriptor( + name='SOLEDAD_DONE_UPLOADING_KEYS', index=5, number=6, + options=None, + type=None), + descriptor.EnumValueDescriptor( + name='SOLEDAD_DOWNLOADING_KEYS', index=6, number=7, + options=None, + type=None), + descriptor.EnumValueDescriptor( + name='SOLEDAD_DONE_DOWNLOADING_KEYS', index=7, number=8, + options=None, + type=None), + descriptor.EnumValueDescriptor( + name='SOLEDAD_NEW_DATA_TO_SYNC', index=8, number=9, + options=None, + type=None), + descriptor.EnumValueDescriptor( + name='SOLEDAD_DONE_DATA_SYNC', index=9, number=10, + options=None, + type=None), + descriptor.EnumValueDescriptor( + name='UPDATER_NEW_UPDATES', index=10, number=11, + options=None, + type=None), + descriptor.EnumValueDescriptor( + name='UPDATER_DONE_UPDATING', index=11, number=12, + options=None, + type=None), + ], + containing_type=None, + options=None, + serialized_start=432, + serialized_end=773, +) + + +CLIENT_SESSION_ID = 1 +CLIENT_UID = 2 +SOLEDAD_CREATING_KEYS = 3 +SOLEDAD_DONE_CREATING_KEYS = 4 +SOLEDAD_UPLOADING_KEYS = 5 +SOLEDAD_DONE_UPLOADING_KEYS = 6 +SOLEDAD_DOWNLOADING_KEYS = 7 +SOLEDAD_DONE_DOWNLOADING_KEYS = 8 +SOLEDAD_NEW_DATA_TO_SYNC = 9 +SOLEDAD_DONE_DATA_SYNC = 10 +UPDATER_NEW_UPDATES = 11 +UPDATER_DONE_UPDATING = 12 + + +_EVENTRESPONSE_STATUS = descriptor.EnumDescriptor( + name='Status', + full_name='leap.common.events.EventResponse.Status', + filename=None, + file=DESCRIPTOR, + values=[ + descriptor.EnumValueDescriptor( + name='OK', index=0, number=1, + options=None, + type=None), + descriptor.EnumValueDescriptor( + name='UNAUTH', index=1, number=2, + options=None, + type=None), + descriptor.EnumValueDescriptor( + name='ERROR', index=2, number=3, + options=None, + type=None), + ], + containing_type=None, + options=None, + serialized_start=390, + serialized_end=429, +) + + +_SIGNALREQUEST = descriptor.Descriptor( + name='SignalRequest', + full_name='leap.common.events.SignalRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + descriptor.FieldDescriptor( + name='event', full_name='leap.common.events.SignalRequest.event', index=0, + number=1, type=14, cpp_type=8, label=2, + has_default_value=False, default_value=1, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + descriptor.FieldDescriptor( + name='content', full_name='leap.common.events.SignalRequest.content', index=1, + number=2, type=9, cpp_type=9, label=2, + has_default_value=False, default_value=unicode("", "utf-8"), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + descriptor.FieldDescriptor( + name='mac_method', full_name='leap.common.events.SignalRequest.mac_method', index=2, + number=3, type=9, cpp_type=9, label=2, + has_default_value=False, default_value=unicode("", "utf-8"), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + descriptor.FieldDescriptor( + name='mac', full_name='leap.common.events.SignalRequest.mac', index=3, + number=4, type=12, cpp_type=9, label=2, + has_default_value=False, default_value="", + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + descriptor.FieldDescriptor( + name='enc_method', full_name='leap.common.events.SignalRequest.enc_method', index=4, + number=5, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=unicode("", "utf-8"), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + descriptor.FieldDescriptor( + name='error_occurred', full_name='leap.common.events.SignalRequest.error_occurred', index=5, + number=6, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + serialized_start=37, + serialized_end=188, +) + + +_REGISTERREQUEST = descriptor.Descriptor( + name='RegisterRequest', + full_name='leap.common.events.RegisterRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + descriptor.FieldDescriptor( + name='event', full_name='leap.common.events.RegisterRequest.event', index=0, + number=1, type=14, cpp_type=8, label=2, + has_default_value=False, default_value=1, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + descriptor.FieldDescriptor( + name='port', full_name='leap.common.events.RegisterRequest.port', index=1, + number=2, type=5, cpp_type=1, label=2, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + descriptor.FieldDescriptor( + name='mac_method', full_name='leap.common.events.RegisterRequest.mac_method', index=2, + number=3, type=9, cpp_type=9, label=2, + has_default_value=False, default_value=unicode("", "utf-8"), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + descriptor.FieldDescriptor( + name='mac', full_name='leap.common.events.RegisterRequest.mac', index=3, + number=4, type=12, cpp_type=9, label=2, + has_default_value=False, default_value="", + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + serialized_start=190, + serialized_end=296, +) + + +_EVENTRESPONSE = descriptor.Descriptor( + name='EventResponse', + full_name='leap.common.events.EventResponse', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + descriptor.FieldDescriptor( + name='status', full_name='leap.common.events.EventResponse.status', index=0, + number=1, type=14, cpp_type=8, label=2, + has_default_value=False, default_value=1, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + descriptor.FieldDescriptor( + name='result', full_name='leap.common.events.EventResponse.result', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=unicode("", "utf-8"), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + _EVENTRESPONSE_STATUS, + ], + options=None, + is_extendable=False, + extension_ranges=[], + serialized_start=299, + serialized_end=429, +) + + +_SIGNALREQUEST.fields_by_name['event'].enum_type = _EVENT +_REGISTERREQUEST.fields_by_name['event'].enum_type = _EVENT +_EVENTRESPONSE.fields_by_name['status'].enum_type = _EVENTRESPONSE_STATUS +_EVENTRESPONSE_STATUS.containing_type = _EVENTRESPONSE + + +class SignalRequest(message.Message): + __metaclass__ = reflection.GeneratedProtocolMessageType + DESCRIPTOR = _SIGNALREQUEST + + # @@protoc_insertion_point(class_scope:leap.common.events.SignalRequest) + + +class RegisterRequest(message.Message): + __metaclass__ = reflection.GeneratedProtocolMessageType + DESCRIPTOR = _REGISTERREQUEST + + # @@protoc_insertion_point(class_scope:leap.common.events.RegisterRequest) + + +class EventResponse(message.Message): + __metaclass__ = reflection.GeneratedProtocolMessageType + DESCRIPTOR = _EVENTRESPONSE + + # @@protoc_insertion_point(class_scope:leap.common.events.EventResponse) + + +_EVENTSSERVERSERVICE = descriptor.ServiceDescriptor( + name='EventsServerService', + full_name='leap.common.events.EventsServerService', + file=DESCRIPTOR, + index=0, + options=None, + serialized_start=776, + serialized_end=961, + methods=[ + descriptor.MethodDescriptor( + name='register', + full_name='leap.common.events.EventsServerService.register', + index=0, + containing_service=None, + input_type=_REGISTERREQUEST, + output_type=_EVENTRESPONSE, + options=None, + ), + descriptor.MethodDescriptor( + name='signal', + full_name='leap.common.events.EventsServerService.signal', + index=1, + containing_service=None, + input_type=_SIGNALREQUEST, + output_type=_EVENTRESPONSE, + options=None, + ), + ]) + + +class EventsServerService(service.Service): + __metaclass__ = service_reflection.GeneratedServiceType + DESCRIPTOR = _EVENTSSERVERSERVICE + + +class EventsServerService_Stub(EventsServerService): + __metaclass__ = service_reflection.GeneratedServiceStubType + DESCRIPTOR = _EVENTSSERVERSERVICE + + +_EVENTSCOMPONENTSERVICE = descriptor.ServiceDescriptor( + name='EventsComponentService', + full_name='leap.common.events.EventsComponentService', + file=DESCRIPTOR, + index=1, + options=None, + serialized_start=963, + serialized_end=1067, + methods=[ + descriptor.MethodDescriptor( + name='signal', + full_name='leap.common.events.EventsComponentService.signal', + index=0, + containing_service=None, + input_type=_SIGNALREQUEST, + output_type=_EVENTRESPONSE, + options=None, + ), + ]) + + +class EventsComponentService(service.Service): + __metaclass__ = service_reflection.GeneratedServiceType + DESCRIPTOR = _EVENTSCOMPONENTSERVICE + + +class EventsComponentService_Stub(EventsComponentService): + __metaclass__ = service_reflection.GeneratedServiceStubType + DESCRIPTOR = _EVENTSCOMPONENTSERVICE + +# @@protoc_insertion_point(module_scope) diff --git a/src/leap/common/events/mac_auth.py b/src/leap/common/events/mac_auth.py new file mode 100644 index 0000000..49d48f7 --- /dev/null +++ b/src/leap/common/events/mac_auth.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +# mac_auth.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +Authentication system for events. + +This is not implemented yet. +""" + + +class MacMethod(object): + """ + Representation of possible MAC authentication methods. + """ + + MAC_NONE = 'none' + MAC_HMAC = 'hmac' diff --git a/src/leap/common/events/server.py b/src/leap/common/events/server.py new file mode 100644 index 0000000..16c6513 --- /dev/null +++ b/src/leap/common/events/server.py @@ -0,0 +1,149 @@ +# -*- coding: utf-8 -*- +# server.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +A server for the events mechanism. + +A server can receive different kinds of requests from components: + + 1. Registration request: store component port number to be notified when + a specific signal arrives. + + 2. Signal request: redistribute the signal to registered components. +""" +import logging +import socket + + +from protobuf.socketrpc import RpcService +from leap.common.events import ( + events_pb2 as proto, + daemon, +) + + +logger = logging.getLogger(__name__) + + +SERVER_PORT = 8090 + +# the `registered_components` dictionary below should have the following +# format: +# +# { event_signal: [ port, ... ], ... } +# +registered_components = {} + + +def ensure_server(port=SERVER_PORT): + """ + Make sure the server is running on the given port. + + Attempt to connect to given local port. Upon success, assume that the + events server has already been started. Upon failure, start events server. + + @param port: the port in which server should be listening + @type port: int + + @return: the daemon instance or nothing + @rtype: EventsServerDaemon or None + """ + try: + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect(('localhost', port)) + s.close() + logger.info('Server is already running on port %d.', port) + return None + except socket.error: + logger.info('Launching server on port %d.', port) + return EventsServerDaemon.ensure(port) + + +class EventsServerService(proto.EventsServerService): + """ + Service for receiving events in components. + """ + + def register(self, controller, request, done): + """ + Register a component port to be signaled when specific events come in. + + @param controller: used to mediate a single method call + @type controller: protobuf.socketrpc.controller.SocketRpcController + @param request: the request received from the component + @type request: leap.common.events.events_pb2.RegisterRequest + @param done: callback to be called when done + @type done: protobuf.socketrpc.server.Callback + """ + logger.info("Received registration request: %s" % str(request)) + # add component port to signal list + if request.event not in registered_components: + registered_components[request.event] = set([]) + registered_components[request.event].add(request.port) + # send response back to component + + logger.debug('sending response back') + response = proto.EventResponse() + response.status = proto.EventResponse.OK + done.run(response) + + def signal(self, controller, request, done): + """ + Perform an RPC call to signal all components registered to receive a + specific signal. + + @param controller: used to mediate a single method call + @type controller: protobuf.socketrpc.controller.SocketRpcController + @param request: the request received from the component + @type request: leap.common.events.events_pb2.SignalRequest + @param done: callback to be called when done + @type done: protobuf.socketrpc.server.Callback + """ + logger.info('Received signal from component: %s', str(request)) + # send signal to all registered components + # TODO: verify signal auth + if request.event in registered_components: + for port in registered_components[request.event]: + + def callback(req, resp): + logger.info("Signal received by " + str(port)) + + service = RpcService(proto.EventsComponentService_Stub, + port, 'localhost') + service.signal(request, callback=callback) + # send response back to component + response = proto.EventResponse() + response.status = proto.EventResponse.OK + done.run(response) + + +class EventsServerDaemon(daemon.EventsSingletonDaemon): + """ + Singleton class for starting an events server daemon. + """ + + @classmethod + def ensure(cls, port): + """ + Make sure the daemon is running on the given port. + + @param port: the port in which the daemon should listen + @type port: int + + @return: a daemon instance + @rtype: EventsServerDaemon + """ + return cls.ensure_service(port, EventsServerService()) diff --git a/src/leap/common/tests/__init__.py b/src/leap/common/tests/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/leap/common/tests/__init__.py diff --git a/src/leap/common/tests/test_events.py b/src/leap/common/tests/test_events.py new file mode 100644 index 0000000..8c0bd36 --- /dev/null +++ b/src/leap/common/tests/test_events.py @@ -0,0 +1,200 @@ +## -*- coding: utf-8 -*- +# test_events.py +# Copyright (C) 2013 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import unittest +import sets +import time +from protobuf.socketrpc import RpcService +from leap.common import events +from leap.common.events import ( + server, + component, + mac_auth, +) +from leap.common.events.events_pb2 import ( + EventsServerService, + EventsServerService_Stub, + EventResponse, + SignalRequest, + RegisterRequest, + SOLEDAD_CREATING_KEYS, + CLIENT_UID, +) + + +port = 8090 + +received = False +local_callback_executed = False + + +def callback(request, reponse): + return True + + +class EventsTestCase(unittest.TestCase): + + @classmethod + def setUpClass(cls): + server.EventsServerDaemon.ensure(8090) + cls.callbacks = events.component.registered_callbacks + + @classmethod + def tearDownClass(cls): + # give some time for requests to be processed. + time.sleep(1) + + def setUp(self): + super(EventsTestCase, self).setUp() + + def tearDown(self): + #events.component.registered_callbacks = {} + server.registered_components = {} + super(EventsTestCase, self).tearDown() + + def test_service_singleton(self): + """ + Ensure that there's always just one instance of the server daemon + running. + """ + service1 = server.EventsServerDaemon.ensure(8090) + service2 = server.EventsServerDaemon.ensure(8090) + self.assertEqual(service1, service2, + "Can't get singleton class for service.") + + def test_component_register(self): + """ + Ensure components can register callbacks. + """ + self.assertTrue(1 not in self.callbacks, + 'There should should be no callback for this signal.') + events.register(1, lambda x: True) + self.assertTrue(1 in self.callbacks, + 'Could not register signal in local component.') + events.register(2, lambda x: True) + self.assertTrue(1 in self.callbacks, + 'Could not register signal in local component.') + self.assertTrue(2 in self.callbacks, + 'Could not register signal in local component.') + + def test_register_signal_replace(self): + """ + Make sure components can replace already registered callbacks. + """ + sig = 3 + cbk = lambda x: True + events.register(sig, cbk, uid=1) + self.assertRaises(Exception, events.register, sig, lambda x: True, + uid=1) + events.register(sig, lambda x: True, uid=1, replace=True) + self.assertTrue(sig in self.callbacks, 'Could not register signal.') + self.assertEqual(1, len(self.callbacks[sig]), + 'Wrong number of registered callbacks.') + + def test_signal_response_status(self): + """ + Ensure there's an appropriate response from server when signaling. + """ + sig = 4 + request = SignalRequest() + request.event = sig + request.content = 'my signal contents' + request.mac_method = mac_auth.MacMethod.MAC_NONE + request.mac = "" + service = RpcService(EventsServerService_Stub, port, 'localhost') + # test synch + response = service.signal(request, timeout=1000) + self.assertEqual(EventResponse.OK, response.status, + 'Wrong response status.') + # test asynch + + def local_callback(request, response): + global local_callback_executed + local_callback_executed = True + + events.register(sig, local_callback) + service.signal(request, callback=local_callback) + time.sleep(0.1) + self.assertTrue(local_callback_executed, + 'Local callback did not execute.') + + def test_events_server_service_register(self): + """ + Ensure the server can register components to be signaled. + """ + sig = 5 + request = RegisterRequest() + request.event = sig + request.port = 8091 + request.mac_method = mac_auth.MacMethod.MAC_NONE + request.mac = "" + service = RpcService(EventsServerService_Stub, port, 'localhost') + complist = server.registered_components + self.assertEqual({}, complist, + 'There should be no registered_ports when ' + 'server has just been created.') + response = service.register(request, timeout=1000) + self.assertTrue(sig in complist, "Signal not registered succesfully.") + self.assertTrue(8091 in complist[sig], + 'Failed registering component port.') + + def test_component_request_register(self): + """ + Ensure components can register themselves with server. + """ + sig = 6 + complist = server.registered_components + self.assertTrue(sig not in complist, + 'There should be no registered components for this ' + 'signal.') + events.register(sig, lambda x: True) + time.sleep(0.1) + port = component.EventsComponentDaemon.get_instance().get_port() + self.assertTrue(sig in complist, 'Failed registering component.') + self.assertTrue(port in complist[sig], + 'Failed registering component port.') + + def test_component_receives_signal(self): + """ + Ensure components can receive signals. + """ + sig = 7 + + def getsig(param=None): + global received + received = True + + events.register(sig, getsig) + request = SignalRequest() + request.event = sig + request.content = "" + request.mac_method = mac_auth.MacMethod.MAC_NONE + request.mac = "" + service = RpcService(EventsServerService_Stub, port, 'localhost') + response = service.signal(request, timeout=1000) + self.assertTrue(response is not None, 'Did not receive response.') + time.sleep(0.5) + self.assertTrue(received, 'Did not receive signal back.') + + def test_component_send_signal(self): + """ + Ensure components can send signals. + """ + sig = 8 + response = events.signal(sig) + self.assertTrue(response.status == response.OK, + 'Received wrong response status when signaling.') |