diff options
Diffstat (limited to 'src/leap/util/coroutines.py')
| -rw-r--r-- | src/leap/util/coroutines.py | 107 | 
1 files changed, 107 insertions, 0 deletions
diff --git a/src/leap/util/coroutines.py b/src/leap/util/coroutines.py new file mode 100644 index 00000000..5e25eb63 --- /dev/null +++ b/src/leap/util/coroutines.py @@ -0,0 +1,107 @@ +# the problem of watching a stdout pipe from +# openvpn binary: using subprocess and coroutines +# acting as event consumers + +from __future__ import division, print_function + +from subprocess import PIPE, Popen +import sys +from threading import Thread + +ON_POSIX = 'posix' in sys.builtin_module_names + + +# +# Coroutines goodies +# + +def coroutine(func): +    def start(*args, **kwargs): +        cr = func(*args, **kwargs) +        cr.next() +        return cr +    return start + + +@coroutine +def process_events(callback): +    """ +    coroutine loop that receives +    events sent and dispatch the callback. +    :param callback: callback to be called\ +for each event +    :type callback: callable +    """ +    try: +        while True: +            m = (yield) +            if callable(callback): +                callback(m) +            else: +                #XXX log instead +                print('not a callable passed') +    except GeneratorExit: +        return + +# +# Threads +# + + +def launch_thread(target, args): +    """ +    launch and demonize thread. +    :param target: target function that will run in thread +    :type target: function +    :param args: args to be passed to thread +    :type args: list +    """ +    t = Thread(target=target, +               args=args) +    t.daemon = True +    t.start() +    return t + + +def watch_output(out, observers): +    """ +    initializes dict of observer coroutines +    and pushes lines to each of them as they are received +    from the watched output. +    :param out: stdout of a process. +    :type out: fd +    :param observers: tuple of coroutines to send data\ +for each event +    :type ovservers: tuple +    """ +    observer_dict = {observer: process_events(observer) +                     for observer in observers} +    for line in iter(out.readline, b''): +        for obs in observer_dict: +            observer_dict[obs].send(line) +    out.close() + + +def spawn_and_watch_process(command, args, observers=None): +    """ +    spawns a subprocess with command, args, and launch +    a watcher thread. +    :param command: command to be executed in the subprocess +    :type command: str +    :param args: arguments +    :type args: list +    :param observers: tuple of observer functions to be called \ +for each line in the subprocess output. +    :type observers: tuple +    :return: a tuple containing the child process instance, and watcher_thread, +    :rtype: (Subprocess, Thread) +    """ +    subp = Popen([command] + args, +                 stdout=PIPE, +                 stderr=PIPE, +                 bufsize=1, +                 close_fds=ON_POSIX) +    watcher = launch_thread( +        watch_output, +        (subp.stdout, observers)) +    return subp, watcher  | 
