summaryrefslogtreecommitdiff
path: root/src/leap/util
diff options
context:
space:
mode:
Diffstat (limited to 'src/leap/util')
-rw-r--r--src/leap/util/__init__.py0
-rw-r--r--src/leap/util/coroutines.py107
-rw-r--r--src/leap/util/fileutil.py113
-rw-r--r--src/leap/util/leap_argparse.py20
-rw-r--r--src/leap/util/test_fileutil.py99
-rw-r--r--src/leap/util/test_leap_argparse.py27
6 files changed, 366 insertions, 0 deletions
diff --git a/src/leap/util/__init__.py b/src/leap/util/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/src/leap/util/__init__.py
diff --git a/src/leap/util/coroutines.py b/src/leap/util/coroutines.py
new file mode 100644
index 00000000..e7ccfacf
--- /dev/null
+++ b/src/leap/util/coroutines.py
@@ -0,0 +1,107 @@
+# the problem of watching a stdout pipe from
+# openvpn binary: using subprocess and coroutines
+# acting as event consumers
+
+from __future__ import division, print_function
+
+from subprocess import PIPE, Popen
+import sys
+from threading import Thread
+
+ON_POSIX = 'posix' in sys.builtin_module_names
+
+
+#
+# Coroutines goodies
+#
+
+def coroutine(func):
+ def start(*args, **kwargs):
+ cr = func(*args, **kwargs)
+ cr.next()
+ return cr
+ return start
+
+
+@coroutine
+def process_events(callback):
+ """
+ coroutine loop that receives
+ events sent and dispatch the callback.
+ :param callback: callback to be called\
+for each event
+ :type callback: callable
+ """
+ try:
+ while True:
+ m = (yield)
+ if callable(callback):
+ callback(m)
+ else:
+ #XXX log instead
+ print('not a callable passed')
+ except GeneratorExit:
+ return
+
+#
+# Threads
+#
+
+
+def launch_thread(target, args):
+ """
+ launch and demonize thread.
+ :param target: target function that will run in thread
+ :type target: function
+ :param args: args to be passed to thread
+ :type args: list
+ """
+ t = Thread(target=target,
+ args=args)
+ t.daemon = True
+ t.start()
+ return t
+
+
+def watch_output(out, observers):
+ """
+ initializes dict of observer coroutines
+ and pushes lines to each of them as they are received
+ from the watched output.
+ :param out: stdout of a process.
+ :type out: fd
+ :param observers: tuple of coroutines to send data\
+for each event
+ :type ovservers: tuple
+ """
+ observer_dict = dict(((observer, process_events(observer))
+ for observer in observers))
+ for line in iter(out.readline, b''):
+ for obs in observer_dict:
+ observer_dict[obs].send(line)
+ out.close()
+
+
+def spawn_and_watch_process(command, args, observers=None):
+ """
+ spawns a subprocess with command, args, and launch
+ a watcher thread.
+ :param command: command to be executed in the subprocess
+ :type command: str
+ :param args: arguments
+ :type args: list
+ :param observers: tuple of observer functions to be called \
+for each line in the subprocess output.
+ :type observers: tuple
+ :return: a tuple containing the child process instance, and watcher_thread,
+ :rtype: (Subprocess, Thread)
+ """
+ subp = Popen([command] + args,
+ stdout=PIPE,
+ stderr=PIPE,
+ bufsize=1,
+ close_fds=ON_POSIX)
+ watcher = launch_thread(
+ watch_output,
+ (subp.stdout, observers))
+ return subp, watcher
diff --git a/src/leap/util/fileutil.py b/src/leap/util/fileutil.py
new file mode 100644
index 00000000..429e4b12
--- /dev/null
+++ b/src/leap/util/fileutil.py
@@ -0,0 +1,113 @@
+import errno
+from itertools import chain
+import logging
+import os
+import platform
+import stat
+
+
+logger = logging.getLogger()
+
+
+def is_user_executable(fpath):
+ st = os.stat(fpath)
+ return bool(st.st_mode & stat.S_IXUSR)
+
+
+def extend_path():
+ ourplatform = platform.system()
+ if ourplatform == "Linux":
+ return "/usr/local/sbin:/usr/sbin"
+ # XXX add mac / win extended search paths?
+
+
+def which(program):
+ """
+ an implementation of which
+ that extends the path with
+ other locations, like sbin
+ (f.i., openvpn binary is likely to be there)
+ @param program: a string representing the binary we're looking for.
+ """
+ def is_exe(fpath):
+ """
+ check that path exists,
+ it's a file,
+ and is executable by the owner
+ """
+ # we would check for access,
+ # but it's likely that we're
+ # using uid 0 + polkitd
+
+ return os.path.isfile(fpath)\
+ and is_user_executable(fpath)
+
+ def ext_candidates(fpath):
+ yield fpath
+ for ext in os.environ.get("PATHEXT", "").split(os.pathsep):
+ yield fpath + ext
+
+ def iter_path(pathset):
+ """
+ returns iterator with
+ full path for a given path list
+ and the current target bin.
+ """
+ for path in pathset.split(os.pathsep):
+ exe_file = os.path.join(path, program)
+ #print 'file=%s' % exe_file
+ for candidate in ext_candidates(exe_file):
+ if is_exe(candidate):
+ yield candidate
+
+ fpath, fname = os.path.split(program)
+ if fpath:
+ if is_exe(program):
+ return program
+ else:
+ # extended iterator
+ # with extra path
+ extended_path = chain(
+ iter_path(os.environ["PATH"]),
+ iter_path(extend_path()))
+ for candidate in extended_path:
+ if candidate is not None:
+ return candidate
+
+ # sorry bro.
+ return None
+
+
+def mkdir_p(path):
+ """
+ implements mkdir -p functionality
+ """
+ try:
+ os.makedirs(path)
+ except OSError as exc:
+ if exc.errno == errno.EEXIST:
+ pass
+ else:
+ raise
+
+
+def check_and_fix_urw_only(_file):
+ """
+ test for 600 mode and try
+ to set it if anything different found
+ """
+ mode = stat.S_IMODE(
+ os.stat(_file).st_mode)
+
+ if mode != int('600', 8):
+ try:
+ logger.warning(
+ 'bad permission on %s '
+ 'attempting to set 600',
+ _file)
+ os.chmod(_file, stat.S_IRUSR | stat.S_IWUSR)
+ except OSError:
+ logger.error(
+ 'error while trying to chmod 600 %s',
+ _file)
+ raise
diff --git a/src/leap/util/leap_argparse.py b/src/leap/util/leap_argparse.py
new file mode 100644
index 00000000..9c355134
--- /dev/null
+++ b/src/leap/util/leap_argparse.py
@@ -0,0 +1,20 @@
+import argparse
+
+
+def build_parser():
+ epilog = "Copyright 2012 The Leap Project"
+ parser = argparse.ArgumentParser(description="""
+Launches main LEAP Client""", epilog=epilog)
+ parser.add_argument('--debug', action="store_true",
+ help='launches in debug mode')
+ parser.add_argument('--config', metavar="CONFIG FILE", nargs='?',
+ action="store", dest="config_file",
+ type=argparse.FileType('r'),
+ help='optional config file')
+ return parser
+
+
+def init_leapc_args():
+ parser = build_parser()
+ opts = parser.parse_args()
+ return parser, opts
diff --git a/src/leap/util/test_fileutil.py b/src/leap/util/test_fileutil.py
new file mode 100644
index 00000000..849decaf
--- /dev/null
+++ b/src/leap/util/test_fileutil.py
@@ -0,0 +1,99 @@
+import os
+import platform
+import shutil
+import stat
+import tempfile
+import unittest
+
+from leap.util import fileutil
+
+
+class FileUtilTest(unittest.TestCase):
+ """
+ test our file utils
+ """
+
+ def setUp(self):
+ self.system = platform.system()
+ self.create_temp_dir()
+
+ def tearDown(self):
+ self.remove_temp_dir()
+
+ #
+ # helpers
+ #
+
+ def create_temp_dir(self):
+ self.tmpdir = tempfile.mkdtemp()
+
+ def remove_temp_dir(self):
+ shutil.rmtree(self.tmpdir)
+
+ def get_file_path(self, filename):
+ return os.path.join(
+ self.tmpdir,
+ filename)
+
+ def touch_exec_file(self):
+ fp = self.get_file_path('testexec')
+ open(fp, 'w').close()
+ os.chmod(
+ fp,
+ stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
+ return fp
+
+ def get_mode(self, fp):
+ return stat.S_IMODE(os.stat(fp).st_mode)
+
+ #
+ # tests
+ #
+
+ def test_is_user_executable(self):
+ """
+ test that a 700 file
+ is an 700 file. kindda oximoronic, but...
+ """
+ # XXX could check access X_OK
+
+ fp = self.touch_exec_file()
+ mode = self.get_mode(fp)
+ self.assertEqual(mode, int('700', 8))
+
+ def test_which(self):
+ """
+ not a very reliable test,
+ but I cannot think of anything smarter now
+ I guess it's highly improbable that copy
+ command is somewhere else..?
+ """
+ # XXX yep, we can change the syspath
+ # for the test... !
+
+ if self.system == "Linux":
+ self.assertEqual(
+ fileutil.which('cp'),
+ '/bin/cp')
+
+ def test_mkdir_p(self):
+ """
+ test our mkdir -p implementation
+ """
+ testdir = self.get_file_path(
+ os.path.join('test', 'foo', 'bar'))
+ self.assertEqual(os.path.isdir(testdir), False)
+ fileutil.mkdir_p(testdir)
+ self.assertEqual(os.path.isdir(testdir), True)
+
+ def test_check_and_fix_urw_only(self):
+ """
+ test function that fixes perms on
+ files that should be rw only for owner
+ """
+ fp = self.touch_exec_file()
+ mode = self.get_mode(fp)
+ self.assertEqual(mode, int('700', 8))
+ fileutil.check_and_fix_urw_only(fp)
+ mode = self.get_mode(fp)
+ self.assertEqual(mode, int('600', 8))
diff --git a/src/leap/util/test_leap_argparse.py b/src/leap/util/test_leap_argparse.py
new file mode 100644
index 00000000..1442e827
--- /dev/null
+++ b/src/leap/util/test_leap_argparse.py
@@ -0,0 +1,27 @@
+from argparse import Namespace
+import unittest
+
+from leap.util import leap_argparse
+
+
+class LeapArgParseTest(unittest.TestCase):
+ """
+ Test argparse options for eip client
+ """
+
+ def setUp(self):
+ """
+ get the parser
+ """
+ self.parser = leap_argparse.build_parser()
+
+ def test_debug_mode(self):
+ """
+ test debug mode option
+ """
+ opts = self.parser.parse_args(
+ ['--debug'])
+ self.assertEqual(
+ opts,
+ Namespace(config_file=None,
+ debug=True))