diff options
Diffstat (limited to 'src/leap/util')
-rw-r--r-- | src/leap/util/__init__.py | 9 | ||||
-rw-r--r-- | src/leap/util/certs.py | 18 | ||||
-rw-r--r-- | src/leap/util/coroutines.py | 109 | ||||
-rw-r--r-- | src/leap/util/dicts.py | 268 | ||||
-rw-r--r-- | src/leap/util/fileutil.py | 120 | ||||
-rw-r--r-- | src/leap/util/geo.py | 32 | ||||
-rw-r--r-- | src/leap/util/leap_argparse.py | 44 | ||||
-rw-r--r-- | src/leap/util/misc.py | 37 | ||||
-rw-r--r-- | src/leap/util/tests/__init__.py | 0 | ||||
-rw-r--r-- | src/leap/util/tests/test_fileutil.py | 100 | ||||
-rw-r--r-- | src/leap/util/tests/test_leap_argparse.py | 35 | ||||
-rw-r--r-- | src/leap/util/tests/test_translations.py | 22 | ||||
-rw-r--r-- | src/leap/util/translations.py | 82 | ||||
-rw-r--r-- | src/leap/util/web.py | 40 |
14 files changed, 0 insertions, 916 deletions
diff --git a/src/leap/util/__init__.py b/src/leap/util/__init__.py deleted file mode 100644 index a70a9a8b..00000000 --- a/src/leap/util/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -import logging -logger = logging.getLogger(__name__) - -try: - import pygeoip - HAS_GEOIP = True -except ImportError: - logger.debug('PyGeoIP not found. Disabled Geo support.') - HAS_GEOIP = False diff --git a/src/leap/util/certs.py b/src/leap/util/certs.py deleted file mode 100644 index f0f790e9..00000000 --- a/src/leap/util/certs.py +++ /dev/null @@ -1,18 +0,0 @@ -import os -import logging - -logger = logging.getLogger(__name__) - - -def get_mac_cabundle(): - # hackaround bundle error - # XXX this needs a better fix! - f = os.path.split(__file__)[0] - sep = os.path.sep - f_ = sep.join(f.split(sep)[:-2]) - verify = os.path.join(f_, 'cacert.pem') - #logger.error('VERIFY PATH = %s' % verify) - exists = os.path.isfile(verify) - #logger.error('do exist? %s', exists) - if exists: - return verify diff --git a/src/leap/util/coroutines.py b/src/leap/util/coroutines.py deleted file mode 100644 index 0657fc04..00000000 --- a/src/leap/util/coroutines.py +++ /dev/null @@ -1,109 +0,0 @@ -# the problem of watching a stdout pipe from -# openvpn binary: using subprocess and coroutines -# acting as event consumers - -from __future__ import division, print_function - -import logging -from subprocess import PIPE, Popen -import sys -from threading import Thread - -logger = logging.getLogger(__name__) - -ON_POSIX = 'posix' in sys.builtin_module_names - - -# -# Coroutines goodies -# - -def coroutine(func): - def start(*args, **kwargs): - cr = func(*args, **kwargs) - cr.next() - return cr - return start - - -@coroutine -def process_events(callback): - """ - coroutine loop that receives - events sent and dispatch the callback. - :param callback: callback to be called\ -for each event - :type callback: callable - """ - try: - while True: - m = (yield) - if callable(callback): - callback(m) - else: - logger.debug('not a callable passed') - except GeneratorExit: - return - -# -# Threads -# - - -def launch_thread(target, args): - """ - launch and demonize thread. - :param target: target function that will run in thread - :type target: function - :param args: args to be passed to thread - :type args: list - """ - t = Thread(target=target, - args=args) - t.daemon = True - t.start() - return t - - -def watch_output(out, observers): - """ - initializes dict of observer coroutines - and pushes lines to each of them as they are received - from the watched output. - :param out: stdout of a process. - :type out: fd - :param observers: tuple of coroutines to send data\ -for each event - :type observers: tuple - """ - observer_dict = dict(((observer, process_events(observer)) - for observer in observers)) - for line in iter(out.readline, b''): - for obs in observer_dict: - observer_dict[obs].send(line) - out.close() - - -def spawn_and_watch_process(command, args, observers=None): - """ - spawns a subprocess with command, args, and launch - a watcher thread. - :param command: command to be executed in the subprocess - :type command: str - :param args: arguments - :type args: list - :param observers: tuple of observer functions to be called \ -for each line in the subprocess output. - :type observers: tuple - :return: a tuple containing the child process instance, and watcher_thread, - :rtype: (Subprocess, Thread) - """ - subp = Popen([command] + args, - stdout=PIPE, - stderr=PIPE, - bufsize=1, - close_fds=ON_POSIX) - watcher = launch_thread( - watch_output, - (subp.stdout, observers)) - return subp, watcher diff --git a/src/leap/util/dicts.py b/src/leap/util/dicts.py deleted file mode 100644 index 001ca96b..00000000 --- a/src/leap/util/dicts.py +++ /dev/null @@ -1,268 +0,0 @@ -# Backport of OrderedDict() class that runs -# on Python 2.4, 2.5, 2.6, 2.7 and pypy. -# Passes Python2.7's test suite and incorporates all the latest updates. - -try: - from thread import get_ident as _get_ident -except ImportError: - from dummy_thread import get_ident as _get_ident - -try: - from _abcoll import KeysView, ValuesView, ItemsView -except ImportError: - pass - - -class OrderedDict(dict): - 'Dictionary that remembers insertion order' - # An inherited dict maps keys to values. - # The inherited dict provides __getitem__, __len__, __contains__, and get. - # The remaining methods are order-aware. - # Big-O running times for all methods are the same as for regular - # dictionaries. - - # The internal self.__map dictionary maps keys to links in a doubly - # linked list. - # The circular doubly linked list starts and ends with a sentinel element. - # The sentinel element never gets deleted (this simplifies the algorithm). - # Each link is stored as a list of length three: [PREV, NEXT, KEY]. - - def __init__(self, *args, **kwds): - '''Initialize an ordered dictionary. Signature is the same as for - regular dictionaries, but keyword arguments are not recommended - because their insertion order is arbitrary. - - ''' - if len(args) > 1: - raise TypeError('expected at most 1 arguments, got %d' % len(args)) - try: - self.__root - except AttributeError: - self.__root = root = [] # sentinel node - root[:] = [root, root, None] - self.__map = {} - self.__update(*args, **kwds) - - def __setitem__(self, key, value, dict_setitem=dict.__setitem__): - 'od.__setitem__(i, y) <==> od[i]=y' - # Setting a new item creates a new link which goes at the end - # of the linked list, and the inherited dictionary is updated - # with the new key/value pair. - if key not in self: - root = self.__root - last = root[0] - last[1] = root[0] = self.__map[key] = [last, root, key] - dict_setitem(self, key, value) - - def __delitem__(self, key, dict_delitem=dict.__delitem__): - 'od.__delitem__(y) <==> del od[y]' - # Deleting an existing item uses self.__map to find the link which is - # then removed by updating the links in the predecessor and successor - # nodes. - dict_delitem(self, key) - link_prev, link_next, key = self.__map.pop(key) - link_prev[1] = link_next - link_next[0] = link_prev - - def __iter__(self): - 'od.__iter__() <==> iter(od)' - root = self.__root - curr = root[1] - while curr is not root: - yield curr[2] - curr = curr[1] - - def __reversed__(self): - 'od.__reversed__() <==> reversed(od)' - root = self.__root - curr = root[0] - while curr is not root: - yield curr[2] - curr = curr[0] - - def clear(self): - 'od.clear() -> None. Remove all items from od.' - try: - for node in self.__map.itervalues(): - del node[:] - root = self.__root - root[:] = [root, root, None] - self.__map.clear() - except AttributeError: - pass - dict.clear(self) - - def popitem(self, last=True): - '''od.popitem() -> (k, v), return and remove a (key, value) pair. - Pairs are returned in LIFO order if last is true or FIFO order if - false. - ''' - if not self: - raise KeyError('dictionary is empty') - root = self.__root - if last: - link = root[0] - link_prev = link[0] - link_prev[1] = root - root[0] = link_prev - else: - link = root[1] - link_next = link[1] - root[1] = link_next - link_next[0] = root - key = link[2] - del self.__map[key] - value = dict.pop(self, key) - return key, value - - # -- the following methods do not depend on the internal structure -- - - def keys(self): - 'od.keys() -> list of keys in od' - return list(self) - - def values(self): - 'od.values() -> list of values in od' - return [self[key] for key in self] - - def items(self): - 'od.items() -> list of (key, value) pairs in od' - return [(key, self[key]) for key in self] - - def iterkeys(self): - 'od.iterkeys() -> an iterator over the keys in od' - return iter(self) - - def itervalues(self): - 'od.itervalues -> an iterator over the values in od' - for k in self: - yield self[k] - - def iteritems(self): - 'od.iteritems -> an iterator over the (key, value) items in od' - for k in self: - yield (k, self[k]) - - def update(*args, **kwds): - '''od.update(E, **F) -> None. Update od from dict/iterable E and F. - - If E is a dict instance, does: for k in E: od[k] = E[k] - If E has a .keys() method, does: for k in E.keys(): - od[k] = E[k] - Or if E is an iterable of items, does: for k, v in E: od[k] = v - In either case, this is followed by: for k, v in F.items(): - od[k] = v - ''' - - if len(args) > 2: - raise TypeError('update() takes at most 2 positional ' - 'arguments (%d given)' % (len(args),)) - elif not args: - raise TypeError('update() takes at least 1 argument (0 given)') - self = args[0] - # Make progressively weaker assumptions about "other" - other = () - if len(args) == 2: - other = args[1] - if isinstance(other, dict): - for key in other: - self[key] = other[key] - elif hasattr(other, 'keys'): - for key in other.keys(): - self[key] = other[key] - else: - for key, value in other: - self[key] = value - for key, value in kwds.items(): - self[key] = value - - __update = update # let subclasses override update - # without breaking __init__ - - __marker = object() - - def pop(self, key, default=__marker): - '''od.pop(k[,d]) -> v - remove specified key and return the corresponding value. - If key is not found, d is returned if given, - otherwise KeyError is raised. - - ''' - if key in self: - result = self[key] - del self[key] - return result - if default is self.__marker: - raise KeyError(key) - return default - - def setdefault(self, key, default=None): - 'od.setdefault(k[,d]) -> od.get(k,d), also set od[k]=d if k not in od' - if key in self: - return self[key] - self[key] = default - return default - - def __repr__(self, _repr_running={}): - 'od.__repr__() <==> repr(od)' - call_key = id(self), _get_ident() - if call_key in _repr_running: - return '...' - _repr_running[call_key] = 1 - try: - if not self: - return '%s()' % (self.__class__.__name__,) - return '%s(%r)' % (self.__class__.__name__, self.items()) - finally: - del _repr_running[call_key] - - def __reduce__(self): - 'Return state information for pickling' - items = [[k, self[k]] for k in self] - inst_dict = vars(self).copy() - for k in vars(OrderedDict()): - inst_dict.pop(k, None) - if inst_dict: - return (self.__class__, (items,), inst_dict) - return self.__class__, (items,) - - def copy(self): - 'od.copy() -> a shallow copy of od' - return self.__class__(self) - - @classmethod - def fromkeys(cls, iterable, value=None): - '''OD.fromkeys(S[, v]) -> New ordered dictionary with keys from S - and values equal to v (which defaults to None). - - ''' - d = cls() - for key in iterable: - d[key] = value - return d - - def __eq__(self, other): - '''od.__eq__(y) <==> od==y. - Comparison to another OD is order-sensitive - while comparison to a regular mapping is order-insensitive. - ''' - if isinstance(other, OrderedDict): - return len(self) == len(other) and self.items() == other.items() - return dict.__eq__(self, other) - - def __ne__(self, other): - return not self == other - - # -- the following methods are only used in Python 2.7 -- - - def viewkeys(self): - "od.viewkeys() -> a set-like object providing a view on od's keys" - return KeysView(self) - - def viewvalues(self): - "od.viewvalues() -> an object providing a view on od's values" - return ValuesView(self) - - def viewitems(self): - "od.viewitems() -> a set-like object providing a view on od's items" - return ItemsView(self) diff --git a/src/leap/util/fileutil.py b/src/leap/util/fileutil.py deleted file mode 100644 index 820ffe46..00000000 --- a/src/leap/util/fileutil.py +++ /dev/null @@ -1,120 +0,0 @@ -import errno -from itertools import chain -import logging -import os -import platform -import stat - - -logger = logging.getLogger() - - -def is_user_executable(fpath): - st = os.stat(fpath) - return bool(st.st_mode & stat.S_IXUSR) - - -def extend_path(): - ourplatform = platform.system() - if ourplatform == "Linux": - return "/usr/local/sbin:/usr/sbin" - # XXX add mac / win extended search paths? - - -def which(program, path=None): - """ - an implementation of which - that extends the path with - other locations, like sbin - (f.i., openvpn binary is likely to be there) - @param program: a string representing the binary we're looking for. - """ - def is_exe(fpath): - """ - check that path exists, - it's a file, - and is executable by the owner - """ - # we would check for access, - # but it's likely that we're - # using uid 0 + polkitd - - return os.path.isfile(fpath)\ - and is_user_executable(fpath) - - def ext_candidates(fpath): - yield fpath - for ext in os.environ.get("PATHEXT", "").split(os.pathsep): - yield fpath + ext - - def iter_path(pathset): - """ - returns iterator with - full path for a given path list - and the current target bin. - """ - for path in pathset.split(os.pathsep): - exe_file = os.path.join(path, program) - #print 'file=%s' % exe_file - for candidate in ext_candidates(exe_file): - if is_exe(candidate): - yield candidate - - fpath, fname = os.path.split(program) - if fpath: - if is_exe(program): - return program - else: - # extended iterator - # with extra path - if path is None: - path = os.environ['PATH'] - extended_path = chain( - iter_path(path), - iter_path(extend_path())) - for candidate in extended_path: - if candidate is not None: - return candidate - - # sorry bro. - return None - - -def mkdir_p(path): - """ - implements mkdir -p functionality - """ - try: - os.makedirs(path) - except OSError as exc: - if exc.errno == errno.EEXIST: - pass - else: - raise - - -def mkdir_f(path): - folder, fname = os.path.split(path) - mkdir_p(folder) - - -def check_and_fix_urw_only(_file): - """ - test for 600 mode and try - to set it if anything different found - """ - mode = stat.S_IMODE( - os.stat(_file).st_mode) - - if mode != int('600', 8): - try: - logger.warning( - 'bad permission on %s ' - 'attempting to set 600', - _file) - os.chmod(_file, stat.S_IRUSR | stat.S_IWUSR) - except OSError: - logger.error( - 'error while trying to chmod 600 %s', - _file) - raise diff --git a/src/leap/util/geo.py b/src/leap/util/geo.py deleted file mode 100644 index 54b29596..00000000 --- a/src/leap/util/geo.py +++ /dev/null @@ -1,32 +0,0 @@ -""" -experimental geo support. -not yet a feature. -in debian, we rely on the (optional) geoip-database -""" -import os -import platform - -from leap.util import HAS_GEOIP - -GEOIP = None - -if HAS_GEOIP: - import pygeoip # we know we can :) - - GEOIP_PATH = None - - if platform.system() == "Linux": - PATH = "/usr/share/GeoIP/GeoIP.dat" - if os.path.isfile(PATH): - GEOIP_PATH = PATH - GEOIP = pygeoip.GeoIP(GEOIP_PATH, pygeoip.MEMORY_CACHE) - - -def get_country_name(ip): - if not GEOIP: - return - try: - country = GEOIP.country_name_by_addr(ip) - except pygeoip.GeoIPError: - country = None - return country if country else "-" diff --git a/src/leap/util/leap_argparse.py b/src/leap/util/leap_argparse.py deleted file mode 100644 index 3412a72c..00000000 --- a/src/leap/util/leap_argparse.py +++ /dev/null @@ -1,44 +0,0 @@ -import argparse - - -def build_parser(): - """ - all the options for the leap arg parser - Some of these could be switched on only if debug flag is present! - """ - epilog = "Copyright 2012 The LEAP Encryption Access Project" - parser = argparse.ArgumentParser(description=""" -Launches the LEAP Client""", epilog=epilog) - parser.add_argument('-d', '--debug', action="store_true", - help=("Launches client in debug mode, writing debug" - "info to stdout")) - parser.add_argument('-l', '--logfile', metavar="LOG FILE", nargs='?', - action="store", dest="log_file", - #type=argparse.FileType('w'), - help='optional log file') - parser.add_argument('--openvpn-verbosity', nargs='?', - type=int, - action="store", dest="openvpn_verb", - help='verbosity level for openvpn logs [1-6]') - - # Not in use, we might want to reintroduce them. - #parser.add_argument('-i', '--no-provider-checks', - #action="store_true", default=False, - #help="skips download of provider config files. gets " - #"config from local files only. Will fail if cannot " - #"find any") - #parser.add_argument('-k', '--no-ca-verify', - #action="store_true", default=False, - #help="(insecure). Skips verification of the server " - #"certificate used in TLS handshake.") - #parser.add_argument('-c', '--config', metavar="CONFIG FILE", nargs='?', - #action="store", dest="config_file", - #type=argparse.FileType('r'), - #help='optional config file') - return parser - - -def init_leapc_args(): - parser = build_parser() - opts, unknown = parser.parse_known_args() - return parser, opts diff --git a/src/leap/util/misc.py b/src/leap/util/misc.py deleted file mode 100644 index d869a1ba..00000000 --- a/src/leap/util/misc.py +++ /dev/null @@ -1,37 +0,0 @@ -""" -misc utils -""" -import psutil - -from leap.base.constants import OPENVPN_BIN - - -class ImproperlyConfigured(Exception): - """ - """ - - -def null_check(value, value_name): - try: - assert value is not None - except AssertionError: - raise ImproperlyConfigured( - "%s parameter cannot be None" % value_name) - - -def get_openvpn_pids(): - # binary name might change - - openvpn_pids = [] - for p in psutil.process_iter(): - try: - # XXX Not exact! - # Will give false positives. - # we should check that cmdline BEGINS - # with openvpn or with our wrapper - # (pkexec / osascript / whatever) - if OPENVPN_BIN in ' '.join(p.cmdline): - openvpn_pids.append(p.pid) - except psutil.error.AccessDenied: - pass - return openvpn_pids diff --git a/src/leap/util/tests/__init__.py b/src/leap/util/tests/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/src/leap/util/tests/__init__.py +++ /dev/null diff --git a/src/leap/util/tests/test_fileutil.py b/src/leap/util/tests/test_fileutil.py deleted file mode 100644 index f5131b3d..00000000 --- a/src/leap/util/tests/test_fileutil.py +++ /dev/null @@ -1,100 +0,0 @@ -import os -import platform -import shutil -import stat -import tempfile -import unittest - -from leap.util import fileutil - - -class FileUtilTest(unittest.TestCase): - """ - test our file utils - """ - - def setUp(self): - self.system = platform.system() - self.create_temp_dir() - - def tearDown(self): - self.remove_temp_dir() - - # - # helpers - # - - def create_temp_dir(self): - self.tmpdir = tempfile.mkdtemp() - - def remove_temp_dir(self): - shutil.rmtree(self.tmpdir) - - def get_file_path(self, filename): - return os.path.join( - self.tmpdir, - filename) - - def touch_exec_file(self): - fp = self.get_file_path('testexec') - open(fp, 'w').close() - os.chmod( - fp, - stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) - return fp - - def get_mode(self, fp): - return stat.S_IMODE(os.stat(fp).st_mode) - - # - # tests - # - - def test_is_user_executable(self): - """ - touch_exec_file creates in mode 700? - """ - # XXX could check access X_OK - - fp = self.touch_exec_file() - mode = self.get_mode(fp) - self.assertEqual(mode, int('700', 8)) - - def test_which(self): - """ - which implementation ok? - not a very reliable test, - but I cannot think of anything smarter now - I guess it's highly improbable that copy - """ - # XXX yep, we can change the syspath - # for the test... ! - - if self.system == "Linux": - self.assertEqual( - fileutil.which('cp'), - '/bin/cp') - - def test_mkdir_p(self): - """ - our own mkdir -p implementation ok? - """ - testdir = self.get_file_path( - os.path.join('test', 'foo', 'bar')) - self.assertEqual(os.path.isdir(testdir), False) - fileutil.mkdir_p(testdir) - self.assertEqual(os.path.isdir(testdir), True) - - def test_check_and_fix_urw_only(self): - """ - ensure check_and_fix_urx_only ok? - """ - fp = self.touch_exec_file() - mode = self.get_mode(fp) - self.assertEqual(mode, int('700', 8)) - fileutil.check_and_fix_urw_only(fp) - mode = self.get_mode(fp) - self.assertEqual(mode, int('600', 8)) - -if __name__ == "__main__": - unittest.main() diff --git a/src/leap/util/tests/test_leap_argparse.py b/src/leap/util/tests/test_leap_argparse.py deleted file mode 100644 index 4e2b811f..00000000 --- a/src/leap/util/tests/test_leap_argparse.py +++ /dev/null @@ -1,35 +0,0 @@ -from argparse import Namespace -import unittest - -from leap.util import leap_argparse - - -class LeapArgParseTest(unittest.TestCase): - """ - Test argparse options for eip client - """ - - def setUp(self): - """ - get the parser - """ - self.parser = leap_argparse.build_parser() - - def test_debug_mode(self): - """ - test debug mode option - """ - opts = self.parser.parse_args( - ['--debug']) - self.assertEqual( - opts, - Namespace( - debug=True, - log_file=None, - #config_file=None, - #no_provider_checks=False, - #no_ca_verify=False, - openvpn_verb=None)) - -if __name__ == "__main__": - unittest.main() diff --git a/src/leap/util/tests/test_translations.py b/src/leap/util/tests/test_translations.py deleted file mode 100644 index 794daeba..00000000 --- a/src/leap/util/tests/test_translations.py +++ /dev/null @@ -1,22 +0,0 @@ -import unittest - -from leap.util import translations - - -class TrasnlationsTestCase(unittest.TestCase): - """ - tests for translation functions and classes - """ - - def setUp(self): - self.trClass = translations.LEAPTranslatable - - def test_trasnlatable(self): - tr = self.trClass({"en": "house", "es": "casa"}) - eq = self.assertEqual - eq(tr.tr(to="es"), "casa") - eq(tr.tr(to="en"), "house") - - -if __name__ == "__main__": - unittest.main() diff --git a/src/leap/util/translations.py b/src/leap/util/translations.py deleted file mode 100644 index f55c8fba..00000000 --- a/src/leap/util/translations.py +++ /dev/null @@ -1,82 +0,0 @@ -import inspect -import logging - -from PyQt4.QtCore import QCoreApplication -from PyQt4.QtCore import QLocale - -logger = logging.getLogger(__name__) - -""" -here I could not do all that I wanted. -the context is not getting passed to the xml file. -Looks like pylupdate4 is somehow a hack that does not -parse too well the python ast. -I guess we could generate the xml for ourselves as a last recourse. -""" - -# XXX BIG NOTE: -# RESIST the temptation to get the translate function -# more compact, or have the Context argument passed as a variable -# Its name HAS to be explicit due to how the pylupdate parser -# works. - - -qtTranslate = QCoreApplication.translate - - -def translate(*args, **kwargs): - """ - our magic function. - translate(Context, text, comment) - """ - if len(args) == 1: - obj = args[0] - if isinstance(obj, LEAPTranslatable) and hasattr(obj, 'tr'): - return obj.tr() - - klsname = None - try: - # get class value from instance - # using live object inspection - prev_frame = inspect.stack()[1][0] - locals_ = inspect.getargvalues(prev_frame).locals - self = locals_.get('self') - if self: - - # Trying to get the class name - # but this is useless, the parser - # has already got the context. - klsname = self.__class__.__name__ - #print 'KLSNAME -- ', klsname - except: - logger.error('error getting stack frame') - - if klsname and len(args) == 1: - nargs = (klsname,) + args - return qtTranslate(*nargs) - - else: - return qtTranslate(*args) - - -class LEAPTranslatable(dict): - """ - An extended dict that implements a .tr method - so it can be translated on the fly by our - magic translate method - """ - - try: - locale = str(QLocale.system().name()).split('_')[0] - except: - logger.warning("could not get system locale!") - print "could not get system locale!" - locale = "en" - - def tr(self, to=None): - if not to: - to = self.locale - _tr = self.get(to, None) - if not _tr: - _tr = self.get("en", None) - return _tr diff --git a/src/leap/util/web.py b/src/leap/util/web.py deleted file mode 100644 index 15de0561..00000000 --- a/src/leap/util/web.py +++ /dev/null @@ -1,40 +0,0 @@ -""" -web related utilities -""" - - -class UsageError(Exception): - """ """ - - -def get_https_domain_and_port(full_domain): - """ - returns a tuple with domain and port - from a full_domain string that can - contain a colon - """ - full_domain = unicode(full_domain) - if full_domain is None: - return None, None - - https_sch = "https://" - http_sch = "http://" - - if full_domain.startswith(https_sch): - full_domain = full_domain.lstrip(https_sch) - elif full_domain.startswith(http_sch): - raise UsageError( - "cannot be called with a domain " - "that begins with 'http://'") - - domain_split = full_domain.split(':') - _len = len(domain_split) - if _len == 1: - domain, port = full_domain, 443 - elif _len == 2: - domain, port = domain_split - else: - raise UsageError( - "must be called with one only parameter" - "in the form domain[:port]") - return domain, port |