path: root/src/leap/util
diff options
authorTomás Touceda <>2013-03-06 15:27:23 -0300
committerTomás Touceda <>2013-03-06 15:27:23 -0300
commit5ff29dc57e2877a14e705d09b7042cddf4165d0a (patch)
tree18e5817a105f0aaa7a9a752f7b644e44a6c867bc /src/leap/util
parente23553caaf93a734578b02f9130dee38161d0e22 (diff)
Remove everything to start from scratch
Diffstat (limited to 'src/leap/util')
14 files changed, 0 insertions, 916 deletions
diff --git a/src/leap/util/ b/src/leap/util/
deleted file mode 100644
index a70a9a8b..00000000
--- a/src/leap/util/
+++ /dev/null
@@ -1,9 +0,0 @@
-import logging
-logger = logging.getLogger(__name__)
- import pygeoip
- HAS_GEOIP = True
-except ImportError:
- logger.debug('PyGeoIP not found. Disabled Geo support.')
- HAS_GEOIP = False
diff --git a/src/leap/util/ b/src/leap/util/
deleted file mode 100644
index f0f790e9..00000000
--- a/src/leap/util/
+++ /dev/null
@@ -1,18 +0,0 @@
-import os
-import logging
-logger = logging.getLogger(__name__)
-def get_mac_cabundle():
- # hackaround bundle error
- # XXX this needs a better fix!
- f = os.path.split(__file__)[0]
- sep = os.path.sep
- f_ = sep.join(f.split(sep)[:-2])
- verify = os.path.join(f_, 'cacert.pem')
- #logger.error('VERIFY PATH = %s' % verify)
- exists = os.path.isfile(verify)
- #logger.error('do exist? %s', exists)
- if exists:
- return verify
diff --git a/src/leap/util/ b/src/leap/util/
deleted file mode 100644
index 0657fc04..00000000
--- a/src/leap/util/
+++ /dev/null
@@ -1,109 +0,0 @@
-# the problem of watching a stdout pipe from
-# openvpn binary: using subprocess and coroutines
-# acting as event consumers
-from __future__ import division, print_function
-import logging
-from subprocess import PIPE, Popen
-import sys
-from threading import Thread
-logger = logging.getLogger(__name__)
-ON_POSIX = 'posix' in sys.builtin_module_names
-# Coroutines goodies
-def coroutine(func):
- def start(*args, **kwargs):
- cr = func(*args, **kwargs)
- return cr
- return start
-def process_events(callback):
- """
- coroutine loop that receives
- events sent and dispatch the callback.
- :param callback: callback to be called\
-for each event
- :type callback: callable
- """
- try:
- while True:
- m = (yield)
- if callable(callback):
- callback(m)
- else:
- logger.debug('not a callable passed')
- except GeneratorExit:
- return
-# Threads
-def launch_thread(target, args):
- """
- launch and demonize thread.
- :param target: target function that will run in thread
- :type target: function
- :param args: args to be passed to thread
- :type args: list
- """
- t = Thread(target=target,
- args=args)
- t.daemon = True
- t.start()
- return t
-def watch_output(out, observers):
- """
- initializes dict of observer coroutines
- and pushes lines to each of them as they are received
- from the watched output.
- :param out: stdout of a process.
- :type out: fd
- :param observers: tuple of coroutines to send data\
-for each event
- :type observers: tuple
- """
- observer_dict = dict(((observer, process_events(observer))
- for observer in observers))
- for line in iter(out.readline, b''):
- for obs in observer_dict:
- observer_dict[obs].send(line)
- out.close()
-def spawn_and_watch_process(command, args, observers=None):
- """
- spawns a subprocess with command, args, and launch
- a watcher thread.
- :param command: command to be executed in the subprocess
- :type command: str
- :param args: arguments
- :type args: list
- :param observers: tuple of observer functions to be called \
-for each line in the subprocess output.
- :type observers: tuple
- :return: a tuple containing the child process instance, and watcher_thread,
- :rtype: (Subprocess, Thread)
- """
- subp = Popen([command] + args,
- stdout=PIPE,
- stderr=PIPE,
- bufsize=1,
- close_fds=ON_POSIX)
- watcher = launch_thread(
- watch_output,
- (subp.stdout, observers))
- return subp, watcher
diff --git a/src/leap/util/ b/src/leap/util/
deleted file mode 100644
index 001ca96b..00000000
--- a/src/leap/util/
+++ /dev/null
@@ -1,268 +0,0 @@
-# Backport of OrderedDict() class that runs
-# on Python 2.4, 2.5, 2.6, 2.7 and pypy.
-# Passes Python2.7's test suite and incorporates all the latest updates.
- from thread import get_ident as _get_ident
-except ImportError:
- from dummy_thread import get_ident as _get_ident
- from _abcoll import KeysView, ValuesView, ItemsView
-except ImportError:
- pass
-class OrderedDict(dict):
- 'Dictionary that remembers insertion order'
- # An inherited dict maps keys to values.
- # The inherited dict provides __getitem__, __len__, __contains__, and get.
- # The remaining methods are order-aware.
- # Big-O running times for all methods are the same as for regular
- # dictionaries.
- # The internal self.__map dictionary maps keys to links in a doubly
- # linked list.
- # The circular doubly linked list starts and ends with a sentinel element.
- # The sentinel element never gets deleted (this simplifies the algorithm).
- # Each link is stored as a list of length three: [PREV, NEXT, KEY].
- def __init__(self, *args, **kwds):
- '''Initialize an ordered dictionary. Signature is the same as for
- regular dictionaries, but keyword arguments are not recommended
- because their insertion order is arbitrary.
- '''
- if len(args) > 1:
- raise TypeError('expected at most 1 arguments, got %d' % len(args))
- try:
- self.__root
- except AttributeError:
- self.__root = root = [] # sentinel node
- root[:] = [root, root, None]
- self.__map = {}
- self.__update(*args, **kwds)
- def __setitem__(self, key, value, dict_setitem=dict.__setitem__):
- 'od.__setitem__(i, y) <==> od[i]=y'
- # Setting a new item creates a new link which goes at the end
- # of the linked list, and the inherited dictionary is updated
- # with the new key/value pair.
- if key not in self:
- root = self.__root
- last = root[0]
- last[1] = root[0] = self.__map[key] = [last, root, key]
- dict_setitem(self, key, value)
- def __delitem__(self, key, dict_delitem=dict.__delitem__):
- 'od.__delitem__(y) <==> del od[y]'
- # Deleting an existing item uses self.__map to find the link which is
- # then removed by updating the links in the predecessor and successor
- # nodes.
- dict_delitem(self, key)
- link_prev, link_next, key = self.__map.pop(key)
- link_prev[1] = link_next
- link_next[0] = link_prev
- def __iter__(self):
- 'od.__iter__() <==> iter(od)'
- root = self.__root
- curr = root[1]
- while curr is not root:
- yield curr[2]
- curr = curr[1]
- def __reversed__(self):
- 'od.__reversed__() <==> reversed(od)'
- root = self.__root
- curr = root[0]
- while curr is not root:
- yield curr[2]
- curr = curr[0]
- def clear(self):
- 'od.clear() -> None. Remove all items from od.'
- try:
- for node in self.__map.itervalues():
- del node[:]
- root = self.__root
- root[:] = [root, root, None]
- self.__map.clear()
- except AttributeError:
- pass
- dict.clear(self)
- def popitem(self, last=True):
- '''od.popitem() -> (k, v), return and remove a (key, value) pair.
- Pairs are returned in LIFO order if last is true or FIFO order if
- false.
- '''
- if not self:
- raise KeyError('dictionary is empty')
- root = self.__root
- if last:
- link = root[0]
- link_prev = link[0]
- link_prev[1] = root
- root[0] = link_prev
- else:
- link = root[1]
- link_next = link[1]
- root[1] = link_next
- link_next[0] = root
- key = link[2]
- del self.__map[key]
- value = dict.pop(self, key)
- return key, value
- # -- the following methods do not depend on the internal structure --
- def keys(self):
- 'od.keys() -> list of keys in od'
- return list(self)
- def values(self):
- 'od.values() -> list of values in od'
- return [self[key] for key in self]
- def items(self):
- 'od.items() -> list of (key, value) pairs in od'
- return [(key, self[key]) for key in self]
- def iterkeys(self):
- 'od.iterkeys() -> an iterator over the keys in od'
- return iter(self)
- def itervalues(self):
- 'od.itervalues -> an iterator over the values in od'
- for k in self:
- yield self[k]
- def iteritems(self):
- 'od.iteritems -> an iterator over the (key, value) items in od'
- for k in self:
- yield (k, self[k])
- def update(*args, **kwds):
- '''od.update(E, **F) -> None. Update od from dict/iterable E and F.
- If E is a dict instance, does: for k in E: od[k] = E[k]
- If E has a .keys() method, does: for k in E.keys():
- od[k] = E[k]
- Or if E is an iterable of items, does: for k, v in E: od[k] = v
- In either case, this is followed by: for k, v in F.items():
- od[k] = v
- '''
- if len(args) > 2:
- raise TypeError('update() takes at most 2 positional '
- 'arguments (%d given)' % (len(args),))
- elif not args:
- raise TypeError('update() takes at least 1 argument (0 given)')
- self = args[0]
- # Make progressively weaker assumptions about "other"
- other = ()
- if len(args) == 2:
- other = args[1]
- if isinstance(other, dict):
- for key in other:
- self[key] = other[key]
- elif hasattr(other, 'keys'):
- for key in other.keys():
- self[key] = other[key]
- else:
- for key, value in other:
- self[key] = value
- for key, value in kwds.items():
- self[key] = value
- __update = update # let subclasses override update
- # without breaking __init__
- __marker = object()
- def pop(self, key, default=__marker):
- '''od.pop(k[,d]) -> v
- remove specified key and return the corresponding value.
- If key is not found, d is returned if given,
- otherwise KeyError is raised.
- '''
- if key in self:
- result = self[key]
- del self[key]
- return result
- if default is self.__marker:
- raise KeyError(key)
- return default
- def setdefault(self, key, default=None):
- 'od.setdefault(k[,d]) -> od.get(k,d), also set od[k]=d if k not in od'
- if key in self:
- return self[key]
- self[key] = default
- return default
- def __repr__(self, _repr_running={}):
- 'od.__repr__() <==> repr(od)'
- call_key = id(self), _get_ident()
- if call_key in _repr_running:
- return '...'
- _repr_running[call_key] = 1
- try:
- if not self:
- return '%s()' % (self.__class__.__name__,)
- return '%s(%r)' % (self.__class__.__name__, self.items())
- finally:
- del _repr_running[call_key]
- def __reduce__(self):
- 'Return state information for pickling'
- items = [[k, self[k]] for k in self]
- inst_dict = vars(self).copy()
- for k in vars(OrderedDict()):
- inst_dict.pop(k, None)
- if inst_dict:
- return (self.__class__, (items,), inst_dict)
- return self.__class__, (items,)
- def copy(self):
- 'od.copy() -> a shallow copy of od'
- return self.__class__(self)
- @classmethod
- def fromkeys(cls, iterable, value=None):
- '''OD.fromkeys(S[, v]) -> New ordered dictionary with keys from S
- and values equal to v (which defaults to None).
- '''
- d = cls()
- for key in iterable:
- d[key] = value
- return d
- def __eq__(self, other):
- '''od.__eq__(y) <==> od==y.
- Comparison to another OD is order-sensitive
- while comparison to a regular mapping is order-insensitive.
- '''
- if isinstance(other, OrderedDict):
- return len(self) == len(other) and self.items() == other.items()
- return dict.__eq__(self, other)
- def __ne__(self, other):
- return not self == other
- # -- the following methods are only used in Python 2.7 --
- def viewkeys(self):
- "od.viewkeys() -> a set-like object providing a view on od's keys"
- return KeysView(self)
- def viewvalues(self):
- "od.viewvalues() -> an object providing a view on od's values"
- return ValuesView(self)
- def viewitems(self):
- "od.viewitems() -> a set-like object providing a view on od's items"
- return ItemsView(self)
diff --git a/src/leap/util/ b/src/leap/util/
deleted file mode 100644
index 820ffe46..00000000
--- a/src/leap/util/
+++ /dev/null
@@ -1,120 +0,0 @@
-import errno
-from itertools import chain
-import logging
-import os
-import platform
-import stat
-logger = logging.getLogger()
-def is_user_executable(fpath):
- st = os.stat(fpath)
- return bool(st.st_mode & stat.S_IXUSR)
-def extend_path():
- ourplatform = platform.system()
- if ourplatform == "Linux":
- return "/usr/local/sbin:/usr/sbin"
- # XXX add mac / win extended search paths?
-def which(program, path=None):
- """
- an implementation of which
- that extends the path with
- other locations, like sbin
- (f.i., openvpn binary is likely to be there)
- @param program: a string representing the binary we're looking for.
- """
- def is_exe(fpath):
- """
- check that path exists,
- it's a file,
- and is executable by the owner
- """
- # we would check for access,
- # but it's likely that we're
- # using uid 0 + polkitd
- return os.path.isfile(fpath)\
- and is_user_executable(fpath)
- def ext_candidates(fpath):
- yield fpath
- for ext in os.environ.get("PATHEXT", "").split(os.pathsep):
- yield fpath + ext
- def iter_path(pathset):
- """
- returns iterator with
- full path for a given path list
- and the current target bin.
- """
- for path in pathset.split(os.pathsep):
- exe_file = os.path.join(path, program)
- #print 'file=%s' % exe_file
- for candidate in ext_candidates(exe_file):
- if is_exe(candidate):
- yield candidate
- fpath, fname = os.path.split(program)
- if fpath:
- if is_exe(program):
- return program
- else:
- # extended iterator
- # with extra path
- if path is None:
- path = os.environ['PATH']
- extended_path = chain(
- iter_path(path),
- iter_path(extend_path()))
- for candidate in extended_path:
- if candidate is not None:
- return candidate
- # sorry bro.
- return None
-def mkdir_p(path):
- """
- implements mkdir -p functionality
- """
- try:
- os.makedirs(path)
- except OSError as exc:
- if exc.errno == errno.EEXIST:
- pass
- else:
- raise
-def mkdir_f(path):
- folder, fname = os.path.split(path)
- mkdir_p(folder)
-def check_and_fix_urw_only(_file):
- """
- test for 600 mode and try
- to set it if anything different found
- """
- mode = stat.S_IMODE(
- os.stat(_file).st_mode)
- if mode != int('600', 8):
- try:
- logger.warning(
- 'bad permission on %s '
- 'attempting to set 600',
- _file)
- os.chmod(_file, stat.S_IRUSR | stat.S_IWUSR)
- except OSError:
- logger.error(
- 'error while trying to chmod 600 %s',
- _file)
- raise
diff --git a/src/leap/util/ b/src/leap/util/
deleted file mode 100644
index 54b29596..00000000
--- a/src/leap/util/
+++ /dev/null
@@ -1,32 +0,0 @@
-experimental geo support.
-not yet a feature.
-in debian, we rely on the (optional) geoip-database
-import os
-import platform
-from leap.util import HAS_GEOIP
-GEOIP = None
- import pygeoip # we know we can :)
- if platform.system() == "Linux":
- PATH = "/usr/share/GeoIP/GeoIP.dat"
- if os.path.isfile(PATH):
- GEOIP = pygeoip.GeoIP(GEOIP_PATH, pygeoip.MEMORY_CACHE)
-def get_country_name(ip):
- if not GEOIP:
- return
- try:
- country = GEOIP.country_name_by_addr(ip)
- except pygeoip.GeoIPError:
- country = None
- return country if country else "-"
diff --git a/src/leap/util/ b/src/leap/util/
deleted file mode 100644
index 3412a72c..00000000
--- a/src/leap/util/
+++ /dev/null
@@ -1,44 +0,0 @@
-import argparse
-def build_parser():
- """
- all the options for the leap arg parser
- Some of these could be switched on only if debug flag is present!
- """
- epilog = "Copyright 2012 The LEAP Encryption Access Project"
- parser = argparse.ArgumentParser(description="""
-Launches the LEAP Client""", epilog=epilog)
- parser.add_argument('-d', '--debug', action="store_true",
- help=("Launches client in debug mode, writing debug"
- "info to stdout"))
- parser.add_argument('-l', '--logfile', metavar="LOG FILE", nargs='?',
- action="store", dest="log_file",
- #type=argparse.FileType('w'),
- help='optional log file')
- parser.add_argument('--openvpn-verbosity', nargs='?',
- type=int,
- action="store", dest="openvpn_verb",
- help='verbosity level for openvpn logs [1-6]')
- # Not in use, we might want to reintroduce them.
- #parser.add_argument('-i', '--no-provider-checks',
- #action="store_true", default=False,
- #help="skips download of provider config files. gets "
- #"config from local files only. Will fail if cannot "
- #"find any")
- #parser.add_argument('-k', '--no-ca-verify',
- #action="store_true", default=False,
- #help="(insecure). Skips verification of the server "
- #"certificate used in TLS handshake.")
- #parser.add_argument('-c', '--config', metavar="CONFIG FILE", nargs='?',
- #action="store", dest="config_file",
- #type=argparse.FileType('r'),
- #help='optional config file')
- return parser
-def init_leapc_args():
- parser = build_parser()
- opts, unknown = parser.parse_known_args()
- return parser, opts
diff --git a/src/leap/util/ b/src/leap/util/
deleted file mode 100644
index d869a1ba..00000000
--- a/src/leap/util/
+++ /dev/null
@@ -1,37 +0,0 @@
-misc utils
-import psutil
-from leap.base.constants import OPENVPN_BIN
-class ImproperlyConfigured(Exception):
- """
- """
-def null_check(value, value_name):
- try:
- assert value is not None
- except AssertionError:
- raise ImproperlyConfigured(
- "%s parameter cannot be None" % value_name)
-def get_openvpn_pids():
- # binary name might change
- openvpn_pids = []
- for p in psutil.process_iter():
- try:
- # XXX Not exact!
- # Will give false positives.
- # we should check that cmdline BEGINS
- # with openvpn or with our wrapper
- # (pkexec / osascript / whatever)
- if OPENVPN_BIN in ' '.join(p.cmdline):
- openvpn_pids.append(
- except psutil.error.AccessDenied:
- pass
- return openvpn_pids
diff --git a/src/leap/util/tests/ b/src/leap/util/tests/
deleted file mode 100644
index e69de29b..00000000
--- a/src/leap/util/tests/
+++ /dev/null
diff --git a/src/leap/util/tests/ b/src/leap/util/tests/
deleted file mode 100644
index f5131b3d..00000000
--- a/src/leap/util/tests/
+++ /dev/null
@@ -1,100 +0,0 @@
-import os
-import platform
-import shutil
-import stat
-import tempfile
-import unittest
-from leap.util import fileutil
-class FileUtilTest(unittest.TestCase):
- """
- test our file utils
- """
- def setUp(self):
- self.system = platform.system()
- self.create_temp_dir()
- def tearDown(self):
- self.remove_temp_dir()
- #
- # helpers
- #
- def create_temp_dir(self):
- self.tmpdir = tempfile.mkdtemp()
- def remove_temp_dir(self):
- shutil.rmtree(self.tmpdir)
- def get_file_path(self, filename):
- return os.path.join(
- self.tmpdir,
- filename)
- def touch_exec_file(self):
- fp = self.get_file_path('testexec')
- open(fp, 'w').close()
- os.chmod(
- fp,
- stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
- return fp
- def get_mode(self, fp):
- return stat.S_IMODE(os.stat(fp).st_mode)
- #
- # tests
- #
- def test_is_user_executable(self):
- """
- touch_exec_file creates in mode 700?
- """
- # XXX could check access X_OK
- fp = self.touch_exec_file()
- mode = self.get_mode(fp)
- self.assertEqual(mode, int('700', 8))
- def test_which(self):
- """
- which implementation ok?
- not a very reliable test,
- but I cannot think of anything smarter now
- I guess it's highly improbable that copy
- """
- # XXX yep, we can change the syspath
- # for the test... !
- if self.system == "Linux":
- self.assertEqual(
- fileutil.which('cp'),
- '/bin/cp')
- def test_mkdir_p(self):
- """
- our own mkdir -p implementation ok?
- """
- testdir = self.get_file_path(
- os.path.join('test', 'foo', 'bar'))
- self.assertEqual(os.path.isdir(testdir), False)
- fileutil.mkdir_p(testdir)
- self.assertEqual(os.path.isdir(testdir), True)
- def test_check_and_fix_urw_only(self):
- """
- ensure check_and_fix_urx_only ok?
- """
- fp = self.touch_exec_file()
- mode = self.get_mode(fp)
- self.assertEqual(mode, int('700', 8))
- fileutil.check_and_fix_urw_only(fp)
- mode = self.get_mode(fp)
- self.assertEqual(mode, int('600', 8))
-if __name__ == "__main__":
- unittest.main()
diff --git a/src/leap/util/tests/ b/src/leap/util/tests/
deleted file mode 100644
index 4e2b811f..00000000
--- a/src/leap/util/tests/
+++ /dev/null
@@ -1,35 +0,0 @@
-from argparse import Namespace
-import unittest
-from leap.util import leap_argparse
-class LeapArgParseTest(unittest.TestCase):
- """
- Test argparse options for eip client
- """
- def setUp(self):
- """
- get the parser
- """
- self.parser = leap_argparse.build_parser()
- def test_debug_mode(self):
- """
- test debug mode option
- """
- opts = self.parser.parse_args(
- ['--debug'])
- self.assertEqual(
- opts,
- Namespace(
- debug=True,
- log_file=None,
- #config_file=None,
- #no_provider_checks=False,
- #no_ca_verify=False,
- openvpn_verb=None))
-if __name__ == "__main__":
- unittest.main()
diff --git a/src/leap/util/tests/ b/src/leap/util/tests/
deleted file mode 100644
index 794daeba..00000000
--- a/src/leap/util/tests/
+++ /dev/null
@@ -1,22 +0,0 @@
-import unittest
-from leap.util import translations
-class TrasnlationsTestCase(unittest.TestCase):
- """
- tests for translation functions and classes
- """
- def setUp(self):
- self.trClass = translations.LEAPTranslatable
- def test_trasnlatable(self):
- tr = self.trClass({"en": "house", "es": "casa"})
- eq = self.assertEqual
- eq("es"), "casa")
- eq("en"), "house")
-if __name__ == "__main__":
- unittest.main()
diff --git a/src/leap/util/ b/src/leap/util/
deleted file mode 100644
index f55c8fba..00000000
--- a/src/leap/util/
+++ /dev/null
@@ -1,82 +0,0 @@
-import inspect
-import logging
-from PyQt4.QtCore import QCoreApplication
-from PyQt4.QtCore import QLocale
-logger = logging.getLogger(__name__)
-here I could not do all that I wanted.
-the context is not getting passed to the xml file.
-Looks like pylupdate4 is somehow a hack that does not
-parse too well the python ast.
-I guess we could generate the xml for ourselves as a last recourse.
-# RESIST the temptation to get the translate function
-# more compact, or have the Context argument passed as a variable
-# Its name HAS to be explicit due to how the pylupdate parser
-# works.
-qtTranslate = QCoreApplication.translate
-def translate(*args, **kwargs):
- """
- our magic function.
- translate(Context, text, comment)
- """
- if len(args) == 1:
- obj = args[0]
- if isinstance(obj, LEAPTranslatable) and hasattr(obj, 'tr'):
- return
- klsname = None
- try:
- # get class value from instance
- # using live object inspection
- prev_frame = inspect.stack()[1][0]
- locals_ = inspect.getargvalues(prev_frame).locals
- self = locals_.get('self')
- if self:
- # Trying to get the class name
- # but this is useless, the parser
- # has already got the context.
- klsname = self.__class__.__name__
- #print 'KLSNAME -- ', klsname
- except:
- logger.error('error getting stack frame')
- if klsname and len(args) == 1:
- nargs = (klsname,) + args
- return qtTranslate(*nargs)
- else:
- return qtTranslate(*args)
-class LEAPTranslatable(dict):
- """
- An extended dict that implements a .tr method
- so it can be translated on the fly by our
- magic translate method
- """
- try:
- locale = str(QLocale.system().name()).split('_')[0]
- except:
- logger.warning("could not get system locale!")
- print "could not get system locale!"
- locale = "en"
- def tr(self, to=None):
- if not to:
- to = self.locale
- _tr = self.get(to, None)
- if not _tr:
- _tr = self.get("en", None)
- return _tr
diff --git a/src/leap/util/ b/src/leap/util/
deleted file mode 100644
index 15de0561..00000000
--- a/src/leap/util/
+++ /dev/null
@@ -1,40 +0,0 @@
-web related utilities
-class UsageError(Exception):
- """ """
-def get_https_domain_and_port(full_domain):
- """
- returns a tuple with domain and port
- from a full_domain string that can
- contain a colon
- """
- full_domain = unicode(full_domain)
- if full_domain is None:
- return None, None
- https_sch = "https://"
- http_sch = "http://"
- if full_domain.startswith(https_sch):
- full_domain = full_domain.lstrip(https_sch)
- elif full_domain.startswith(http_sch):
- raise UsageError(
- "cannot be called with a domain "
- "that begins with 'http://'")
- domain_split = full_domain.split(':')
- _len = len(domain_split)
- if _len == 1:
- domain, port = full_domain, 443
- elif _len == 2:
- domain, port = domain_split
- else:
- raise UsageError(
- "must be called with one only parameter"
- "in the form domain[:port]")
- return domain, port