# -*- coding: utf-8 -*- # configurable.py # Copyright (C) 2015, 2016 LEAP # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ Configurable Backend for Bitmask Service. """ import ConfigParser import locale import os import re import sys from twisted.application import service from twisted.python import log from leap.common import files class MissingConfigEntry(Exception): """ A required config entry was not found. """ class ConfigurableService(service.MultiService): config_file = u"bitmaskd.cfg" service_names = ('mail', 'eip', 'zmq', 'web') def __init__(self, basedir='~/.config/leap'): service.MultiService.__init__(self) path = os.path.abspath(os.path.expanduser(basedir)) if not os.path.isdir(path): files.mkdir_p(path) self.basedir = path # creates self.config self.read_config() def get_config(self, section, option, default=None, boolean=False): try: if boolean: return self.config.getboolean(section, option) item = self.config.get(section, option) return item except (ConfigParser.NoOptionError, ConfigParser.NoSectionError): if default is None: fn = os.path.join(self.basedir, self.config_file) raise MissingConfigEntry("%s is missing the [%s]%s entry" % (_quote_output(fn), section, option)) return default def set_config(self, section, option, value): if not self.config.has_section(section): self.config.add_section(section) self.config.set(section, option, value) self.save_config() self.read_config() assert self.config.get(section, option) == value def read_config(self): self.config = ConfigParser.SafeConfigParser() bitmaskd_cfg = self._get_config_path() if not os.path.isfile(bitmaskd_cfg): self._create_default_config(bitmaskd_cfg) try: with open(bitmaskd_cfg, "rb") as f: self.config.readfp(f) except EnvironmentError: if os.path.exists(bitmaskd_cfg): raise def save_config(self): bitmaskd_cfg = self._get_config_path() with open(bitmaskd_cfg, 'wb') as f: self.config.write(f) def _create_default_config(self, path): with open(path, 'w') as outf: outf.write(DEFAULT_CONFIG) def _get_config_path(self): return os.path.join(self.basedir, self.config_file) DEFAULT_CONFIG = """ [services] mail = True eip = True zmq = True web = False """ def canonical_encoding(encoding): if encoding is None: log.msg("Warning: falling back to UTF-8 encoding.", level=log.WEIRD) encoding = 'utf-8' encoding = encoding.lower() if encoding == "cp65001": encoding = 'utf-8' elif (encoding == "us-ascii" or encoding == "646" or encoding == "ansi_x3.4-1968"): encoding = 'ascii' return encoding def check_encoding(encoding): # sometimes Python returns an encoding name that it doesn't support for # conversion fail early if this happens try: u"test".encode(encoding) except (LookupError, AttributeError): raise AssertionError( "The character encoding '%s' is not supported for conversion." % ( encoding,)) filesystem_encoding = None io_encoding = None is_unicode_platform = False def _reload(): global filesystem_encoding, io_encoding, is_unicode_platform filesystem_encoding = canonical_encoding(sys.getfilesystemencoding()) check_encoding(filesystem_encoding) if sys.platform == 'win32': # On Windows we install UTF-8 stream wrappers for sys.stdout and # sys.stderr, and reencode the arguments as UTF-8 (see # scripts/runner.py). io_encoding = 'utf-8' else: ioenc = None if hasattr(sys.stdout, 'encoding'): ioenc = sys.stdout.encoding if ioenc is None: try: ioenc = locale.getpreferredencoding() except Exception: pass # work around io_encoding = canonical_encoding(ioenc) check_encoding(io_encoding) is_unicode_platform = sys.platform in ["win32", "darwin"] _reload() def _quote_output(s, quotemarks=True, quote_newlines=None, encoding=None): """ Encode either a Unicode string or a UTF-8-encoded bytestring for representation on stdout or stderr, tolerating errors. If 'quotemarks' is True, the string is always quoted; otherwise, it is quoted only if necessary to avoid ambiguity or control bytes in the output. (Newlines are counted as control bytes iff quote_newlines is True.) Quoting may use either single or double quotes. Within single quotes, all characters stand for themselves, and ' will not appear. Within double quotes, Python-compatible backslash escaping is used. If not explicitly given, quote_newlines is True when quotemarks is True. """ assert isinstance(s, (str, unicode)) if quote_newlines is None: quote_newlines = quotemarks if isinstance(s, str): try: s = s.decode('utf-8') except UnicodeDecodeError: return 'b"%s"' % ( ESCAPABLE_8BIT.sub( lambda m: _str_escape(m, quote_newlines), s),) must_double_quote = (quote_newlines and MUST_DOUBLE_QUOTE_NL or MUST_DOUBLE_QUOTE) if must_double_quote.search(s) is None: try: out = s.encode(encoding or io_encoding) if quotemarks or out.startswith('"'): return "'%s'" % (out,) else: return out except (UnicodeDecodeError, UnicodeEncodeError): pass escaped = ESCAPABLE_UNICODE.sub( lambda m: _unicode_escape(m, quote_newlines), s) return '"%s"' % ( escaped.encode(encoding or io_encoding, 'backslashreplace'),) def _unicode_escape(m, quote_newlines): u = m.group(0) if u == u'"' or u == u'$' or u == u'`' or u == u'\\': return u'\\' + u elif u == u'\n' and not quote_newlines: return u if len(u) == 2: codepoint = ( ord(u[0]) - 0xD800) * 0x400 + ord(u[1]) - 0xDC00 + 0x10000 else: codepoint = ord(u) if codepoint > 0xFFFF: return u'\\U%08x' % (codepoint,) elif codepoint > 0xFF: return u'\\u%04x' % (codepoint,) else: return u'\\x%02x' % (codepoint,) def _str_escape(m, quote_newlines): c = m.group(0) if c == '"' or c == '$' or c == '`' or c == '\\': return '\\' + c elif c == '\n' and not quote_newlines: return c else: return '\\x%02x' % (ord(c),) MUST_DOUBLE_QUOTE_NL = re.compile( ur'[^\x20-\x26\x28-\x7E\u00A0-\uD7FF\uE000-\uFDCF\uFDF0-\uFFFC]', re.DOTALL) MUST_DOUBLE_QUOTE = re.compile( ur'[^\n\x20-\x26\x28-\x7E\u00A0-\uD7FF\uE000-\uFDCF\uFDF0-\uFFFC]', re.DOTALL) ESCAPABLE_8BIT = re.compile( r'[^ !#\x25-\x5B\x5D-\x5F\x61-\x7E]', re.DOTALL) # if we must double-quote, then we have to escape ", $ and `, but need not # escape ' ESCAPABLE_UNICODE = re.compile( ur'([\uD800-\uDBFF][\uDC00-\uDFFF])|' # valid surrogate pairs ur'[^ !#\x25-\x5B\x5D-\x5F\x61-\x7E\u00A0-\uD7FF' ur'\uE000-\uFDCF\uFDF0-\uFFFC]', re.DOTALL)