""" Configuration Base Class """ import grp import json import logging import socket import tempfile import os logger = logging.getLogger(name=__name__) logger.setLevel('DEBUG') import configuration import requests from leap.base import exceptions from leap.base import constants from leap.util.fileutil import (mkdir_p) # move to base! from leap.eip import exceptions as eipexceptions class BaseLeapConfig(object): slug = None # XXX we have to enforce that every derived class # has a slug (via interface) # get property getter that raises NI.. def save(self): raise NotImplementedError("abstract base class") def load(self): raise NotImplementedError("abstract base class") def get_config(self, *kwargs): raise NotImplementedError("abstract base class") #XXX todo: enable this property after #fixing name clash with "config" in use at #vpnconnection #@property #def config(self): #return self.get_config() def get_value(self, *kwargs): raise NotImplementedError("abstract base class") class MetaConfigWithSpec(type): """ metaclass for JSONLeapConfig classes. It creates a configuration spec out of the `spec` dictionary. """ # XXX in the near future, this is the # place where we want to enforce # singletons, read-only and stuff. # TODO: # - add a error handler for missing options that # we can act easily upon (sys.exit is ugly, for $deity's sake) def __new__(meta, classname, bases, classDict): spec_options = classDict.get('spec', None) # not quite happy with this workaround. # I want to raise if missing spec dict, but only # for grand-children of this metaclass. # maybe should use abc module for this. abcderived = ("JSONLeapConfig",) if spec_options is None and classname not in abcderived: raise exceptions.ImproperlyConfigured( "missing spec dict on your derived class") # we create a configuration spec attribute from the spec dict config_class = type( classname + "Spec", (configuration.Configuration, object), {'options': spec_options}) classDict['spec'] = config_class return type.__new__(meta, classname, bases, classDict) ########################################################## # hacking in progress: # Configs have: # - a slug (from where a filename/folder is derived) # - a spec (for validation and defaults). # this spec is basically a dict that will be used # for type casting and validation, and defaults settings. # all config objects, since they are derived from BaseConfig, implement basic # useful methods: # - save # - load # - get_config (returns a optparse.OptionParser object) # TODO: # - have a good type cast repertory (uris, version, hashes...) # - raise validation errors # - multilingual objects ########################################################## class JSONLeapConfig(BaseLeapConfig): __metaclass__ = MetaConfigWithSpec def __init__(self, *args, **kwargs): # sanity check try: assert self.slug is not None except AssertionError: raise exceptions.ImproperlyConfigured( "missing slug on JSONLeapConfig" " derived class") try: assert self.spec is not None except AssertionError: raise exceptions.ImproperlyConfigured( "missing spec on JSONLeapConfig" " derived class") assert issubclass(self.spec, configuration.Configuration) self._config = self.spec() self._config.parse_args(list(args)) self.fetcher = kwargs.pop('fetcher', requests) # mandatory baseconfig interface def save(self, to=None): if to is None: to = self.filename folder, filename = os.path.split(to) if folder and not os.path.isdir(folder): mkdir_p(folder) # lazy evaluation until first level of nesting # to allow lambdas with context-dependant info # like os.path.expanduser config = self.get_config() for k, v in config.iteritems(): if callable(v): config[k] = v() self._config.serialize(to) def load(self, fromfile=None, from_uri=None, fetcher=None, verify=False): if from_uri is not None: fetched = self.fetch(from_uri, fetcher=fetcher, verify=verify) if fetched: return if fromfile is None: fromfile = self.filename newconfig = self._config.deserialize(fromfile) # XXX check for no errors, etc self._config.config = newconfig def fetch(self, uri, fetcher=None, verify=True): if not fetcher: fetcher = self.fetcher logger.debug('verify: %s', verify) request = fetcher.get(uri, verify=verify) # XXX get 404, ... # and raise a UnableToFetch... request.raise_for_status() fd, fname = tempfile.mkstemp(suffix=".json") if not request.json: try: json.loads(request.content) except ValueError: raise eipexceptions.LeapBadConfigFetchedError with open(fname, 'w') as tmp: tmp.write(json.dumps(request.json)) self._loadtemp(fname) return True def get_config(self): return self._config.config # public methods def get_filename(self): return self._slug_to_filename() @property def filename(self): return self.get_filename() # private def _loadtemp(self, filename): self.load(fromfile=filename) os.remove(filename) def _slug_to_filename(self): # is this going to work in winland if slug is "foo/bar" ? folder, filename = os.path.split(self.slug) # XXX fix import config_file = get_config_file(filename, folder) return config_file def exists(self): return os.path.isfile(self.filename) # # utility functions # # (might be moved to some class as we see fit, but # let's remain functional for a while) # maybe base.config.util ?? # def get_config_dir(): """ get the base dir for all leap config @rparam: config path @rtype: string """ # TODO # check for $XDG_CONFIG_HOME var? # get a more sensible path for win/mac # kclair: opinion? ^^ return os.path.expanduser( os.path.join('~', '.config', 'leap')) def get_config_file(filename, folder=None): """ concatenates the given filename with leap config dir. @param filename: name of the file @type filename: string @rparam: full path to config file """ path = [] path.append(get_config_dir()) if folder is not None: path.append(folder) path.append(filename) return os.path.join(*path) def get_default_provider_path(): default_subpath = os.path.join("providers", constants.DEFAULT_PROVIDER) default_provider_path = get_config_file( '', folder=default_subpath) return default_provider_path def validate_ip(ip_str): """ raises exception if the ip_str is not a valid representation of an ip """ socket.inet_aton(ip_str) def get_username(): return os.getlogin() def get_groupname(): gid = os.getgroups()[-1] return grp.getgrgid(gid).gr_name # json stuff # XXX merge with JSONConfig / EIPChecks as appropiate. #def get_config_json(config_file=None): #""" #will replace get_config function be developing them #in parralel for branch purposes. #@param: configuration file #@type: file #@rparam: configuration turples #@rtype: dictionary #""" #if not config_file: #TODO: NOT SURE WHAT this default should be, if anything #fpath = get_config_file('eip.json') #if not os.path.isfile(fpath): #dpath, cfile = os.path.split(fpath) #if not os.path.isdir(dpath): #mkdir_p(dpath) #with open(fpath, 'wb') as configfile: #configfile.flush() #try: #return json.load(open(fpath)) #except ValueError: #raise exceptions.MissingConfigFileError # #else: #TODO: add validity checks of file #try: #return json.load(open(config_file)) #except IOError: #raise exceptions.MissingConfigFileError # # #def get_definition_file(url=None): #""" #""" #TODO: determine good default location of definition file. #r = requests.get(url) #return r.json # # #def is_internet_up(): #"""TODO: Build more robust network diagnosis capabilities #""" #try: #requests.get('http://128.30.52.45', timeout=1) #return True #except requests.Timeout: # as err: #pass #return False # # XXX DEPRECATE. # move to eip.checks # # XXX merge conflict # some tests are still using this deprecated Configuration object. # moving it here transiently until I clean merge commit. # -- kali 2012-08-24 00:32 # # # #class Configuration(object): #""" #All configurations (providers et al) will be managed in this class. #""" #def __init__(self, provider_url=None): #try: #requests.get('foo') #self.providers = {} #self.error = False #provider_file = self.check_and_get_definition_file(provider_url) #self.providers['default'] = get_config_json(provider_file) #except (requests.HTTPError, requests.RequestException) as e: #self.error = e.message #except requests.ConnectionError as e: #if e.message == "[Errno 113] No route to host": #if not is_internet_up: # this was meant to be a function invocation I guess... #self.error = "No valid internet connection found" #else: #self.error = "Provider server appears currently down." # #def check_and_get_definition_file(self, provider_url): #""" #checks if provider definition.json file is present. #if not downloads one from the web. #""" #default_provider_path = get_default_provider_path() # #if not os.path.isdir(default_provider_path): #mkdir_p(default_provider_path) # #definition_file = get_config_file( #'definition.json', #folder=default_provider_path) # #if os.path.isfile(definition_file): #return # #else: #r = requests.get(provider_url) #r.raise_for_status() #with open(definition_file, 'wb') as f: #f.write(json.dumps(r.json, indent=4)) #return definition_file