From c46d8da153ac658c8bd145376e22b1218db1090a Mon Sep 17 00:00:00 2001 From: kali Date: Sun, 22 Jul 2012 21:10:15 -0700 Subject: initial import --- src/leap/utils/coroutines.py | 107 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 107 insertions(+) create mode 100644 src/leap/utils/coroutines.py (limited to 'src/leap/utils/coroutines.py') diff --git a/src/leap/utils/coroutines.py b/src/leap/utils/coroutines.py new file mode 100644 index 00000000..5e25eb63 --- /dev/null +++ b/src/leap/utils/coroutines.py @@ -0,0 +1,107 @@ +# the problem of watching a stdout pipe from +# openvpn binary: using subprocess and coroutines +# acting as event consumers + +from __future__ import division, print_function + +from subprocess import PIPE, Popen +import sys +from threading import Thread + +ON_POSIX = 'posix' in sys.builtin_module_names + + +# +# Coroutines goodies +# + +def coroutine(func): + def start(*args, **kwargs): + cr = func(*args, **kwargs) + cr.next() + return cr + return start + + +@coroutine +def process_events(callback): + """ + coroutine loop that receives + events sent and dispatch the callback. + :param callback: callback to be called\ +for each event + :type callback: callable + """ + try: + while True: + m = (yield) + if callable(callback): + callback(m) + else: + #XXX log instead + print('not a callable passed') + except GeneratorExit: + return + +# +# Threads +# + + +def launch_thread(target, args): + """ + launch and demonize thread. + :param target: target function that will run in thread + :type target: function + :param args: args to be passed to thread + :type args: list + """ + t = Thread(target=target, + args=args) + t.daemon = True + t.start() + return t + + +def watch_output(out, observers): + """ + initializes dict of observer coroutines + and pushes lines to each of them as they are received + from the watched output. + :param out: stdout of a process. + :type out: fd + :param observers: tuple of coroutines to send data\ +for each event + :type ovservers: tuple + """ + observer_dict = {observer: process_events(observer) + for observer in observers} + for line in iter(out.readline, b''): + for obs in observer_dict: + observer_dict[obs].send(line) + out.close() + + +def spawn_and_watch_process(command, args, observers=None): + """ + spawns a subprocess with command, args, and launch + a watcher thread. + :param command: command to be executed in the subprocess + :type command: str + :param args: arguments + :type args: list + :param observers: tuple of observer functions to be called \ +for each line in the subprocess output. + :type observers: tuple + :return: a tuple containing the child process instance, and watcher_thread, + :rtype: (Subprocess, Thread) + """ + subp = Popen([command] + args, + stdout=PIPE, + stderr=PIPE, + bufsize=1, + close_fds=ON_POSIX) + watcher = launch_thread( + watch_output, + (subp.stdout, observers)) + return subp, watcher -- cgit v1.2.3