diff options
Diffstat (limited to 'src/leap/base/config.py')
-rw-r--r-- | src/leap/base/config.py | 348 |
1 files changed, 0 insertions, 348 deletions
diff --git a/src/leap/base/config.py b/src/leap/base/config.py deleted file mode 100644 index 85bb3d66..00000000 --- a/src/leap/base/config.py +++ /dev/null @@ -1,348 +0,0 @@ -""" -Configuration Base Class -""" -import grp -import json -import logging -import re -import socket -import time -import os - -logger = logging.getLogger(name=__name__) - -from dateutil import parser as dateparser -from xdg import BaseDirectory -import requests - -from leap.base import exceptions -from leap.base import constants -from leap.base.pluggableconfig import PluggableConfig -from leap.util.fileutil import (mkdir_p) - -# move to base! -from leap.eip import exceptions as eipexceptions - - -class BaseLeapConfig(object): - slug = None - - # XXX we have to enforce that every derived class - # has a slug (via interface) - # get property getter that raises NI.. - - def save(self): - raise NotImplementedError("abstract base class") - - def load(self): - raise NotImplementedError("abstract base class") - - def get_config(self, *kwargs): - raise NotImplementedError("abstract base class") - - @property - def config(self): - return self.get_config() - - def get_value(self, *kwargs): - raise NotImplementedError("abstract base class") - - -class MetaConfigWithSpec(type): - """ - metaclass for JSONLeapConfig classes. - It creates a configuration spec out of - the `spec` dictionary. The `properties` attribute - of the spec dict is turn into the `schema` attribute - of the new class (which will be used to validate against). - """ - # XXX in the near future, this is the - # place where we want to enforce - # singletons, read-only and similar stuff. - - def __new__(meta, classname, bases, classDict): - schema_obj = classDict.get('spec', None) - - # not quite happy with this workaround. - # I want to raise if missing spec dict, but only - # for grand-children of this metaclass. - # maybe should use abc module for this. - abcderived = ("JSONLeapConfig",) - if schema_obj is None and classname not in abcderived: - raise exceptions.ImproperlyConfigured( - "missing spec dict on your derived class (%s)" % classname) - - # we create a configuration spec attribute - # from the spec dict - config_class = type( - classname + "Spec", - (PluggableConfig, object), - {'options': schema_obj}) - classDict['spec'] = config_class - - return type.__new__(meta, classname, bases, classDict) - -########################################################## -# some hacking still in progress: - -# Configs have: - -# - a slug (from where a filename/folder is derived) -# - a spec (for validation and defaults). -# this spec is conformant to the json-schema. -# basically a dict that will be used -# for type casting and validation, and defaults settings. - -# all config objects, since they are derived from BaseConfig, implement basic -# useful methods: -# - save -# - load - -########################################################## - - -class JSONLeapConfig(BaseLeapConfig): - - __metaclass__ = MetaConfigWithSpec - - def __init__(self, *args, **kwargs): - # sanity check - try: - assert self.slug is not None - except AssertionError: - raise exceptions.ImproperlyConfigured( - "missing slug on JSONLeapConfig" - " derived class") - try: - assert self.spec is not None - except AssertionError: - raise exceptions.ImproperlyConfigured( - "missing spec on JSONLeapConfig" - " derived class") - assert issubclass(self.spec, PluggableConfig) - - self.domain = kwargs.pop('domain', None) - self._config = self.spec(format="json") - self._config.load() - self.fetcher = kwargs.pop('fetcher', requests) - - # mandatory baseconfig interface - - def save(self, to=None, force=False): - """ - force param will skip the dirty check. - :type force: bool - """ - # XXX this force=True does not feel to right - # but still have to look for a better way - # of dealing with dirtiness and the - # trick of loading remote config only - # when newer. - - if force: - do_save = True - else: - do_save = self._config.is_dirty() - - if do_save: - if to is None: - to = self.filename - folder, filename = os.path.split(to) - if folder and not os.path.isdir(folder): - mkdir_p(folder) - self._config.serialize(to) - return True - - else: - return False - - def load(self, fromfile=None, from_uri=None, fetcher=None, - force_download=False, verify=True): - - if from_uri is not None: - fetched = self.fetch( - from_uri, - fetcher=fetcher, - verify=verify, - force_dl=force_download) - if fetched: - return - if fromfile is None: - fromfile = self.filename - if os.path.isfile(fromfile): - self._config.load(fromfile=fromfile) - else: - logger.warning('tried to load config from non-existent path') - logger.warning('Not Found: %s', fromfile) - - def fetch(self, uri, fetcher=None, verify=True, force_dl=False): - if not fetcher: - fetcher = self.fetcher - - logger.debug('uri: %s (verify: %s)' % (uri, verify)) - - rargs = (uri, ) - rkwargs = {'verify': verify} - headers = {} - - curmtime = self.get_mtime() if not force_dl else None - if curmtime: - logger.debug('requesting with if-modified-since %s' % curmtime) - headers['if-modified-since'] = curmtime - rkwargs['headers'] = headers - - #request = fetcher.get(uri, verify=verify) - request = fetcher.get(*rargs, **rkwargs) - request.raise_for_status() - - if request.status_code == 304: - logger.debug('...304 Not Changed') - # On this point, we have to assume that - # we HAD the filename. If that filename is corruct, - # we should enforce a force_download in the load - # method above. - self._config.load(fromfile=self.filename) - return True - - if request.json: - mtime = None - last_modified = request.headers.get('last-modified', None) - if last_modified: - _mtime = dateparser.parse(last_modified) - mtime = int(_mtime.strftime("%s")) - if callable(request.json): - _json = request.json() - else: - # back-compat - _json = request.json - self._config.load(json.dumps(_json), mtime=mtime) - self._config.set_dirty() - else: - # not request.json - # might be server did not announce content properly, - # let's try deserializing all the same. - try: - self._config.load(request.content) - self._config.set_dirty() - except ValueError: - raise eipexceptions.LeapBadConfigFetchedError - - return True - - def get_mtime(self): - try: - _mtime = os.stat(self.filename)[8] - mtime = time.strftime("%c GMT", time.gmtime(_mtime)) - return mtime - except OSError: - return None - - def get_config(self): - return self._config.config - - # public methods - - def get_filename(self): - return self._slug_to_filename() - - @property - def filename(self): - return self.get_filename() - - def validate(self, data): - logger.debug('validating schema') - self._config.validate(data) - return True - - # private - - def _slug_to_filename(self): - # is this going to work in winland if slug is "foo/bar" ? - folder, filename = os.path.split(self.slug) - config_file = get_config_file(filename, folder) - return config_file - - def exists(self): - return os.path.isfile(self.filename) - - -# -# utility functions -# -# (might be moved to some class as we see fit, but -# let's remain functional for a while) -# maybe base.config.util ?? -# - - -def get_config_dir(): - """ - get the base dir for all leap config - @rparam: config path - @rtype: string - """ - home = os.path.expanduser("~") - if re.findall("leap_tests-[_a-zA-Z0-9]{6}", home): - # we're inside a test! :) - return os.path.join(home, ".config/leap") - else: - # XXX dirspec is cross-platform, - # we should borrow some of those - # routines for osx/win and wrap this call. - return os.path.join(BaseDirectory.xdg_config_home, - 'leap') - - -def get_config_file(filename, folder=None): - """ - concatenates the given filename - with leap config dir. - @param filename: name of the file - @type filename: string - @rparam: full path to config file - """ - path = [] - path.append(get_config_dir()) - if folder is not None: - path.append(folder) - path.append(filename) - return os.path.join(*path) - - -def get_default_provider_path(): - default_subpath = os.path.join("providers", - constants.DEFAULT_PROVIDER) - default_provider_path = get_config_file( - '', - folder=default_subpath) - return default_provider_path - - -def get_provider_path(domain): - # XXX if not domain, return get_default_provider_path - default_subpath = os.path.join("providers", domain) - provider_path = get_config_file( - '', - folder=default_subpath) - return provider_path - - -def validate_ip(ip_str): - """ - raises exception if the ip_str is - not a valid representation of an ip - """ - socket.inet_aton(ip_str) - - -def get_username(): - try: - return os.getlogin() - except OSError as e: - import pwd - return pwd.getpwuid(os.getuid())[0] - - -def get_groupname(): - gid = os.getgroups()[-1] - return grp.getgrgid(gid).gr_name |