diff options
author | antialias <antialias@leap.se> | 2012-08-14 13:41:01 -0700 |
---|---|---|
committer | antialias <antialias@leap.se> | 2012-08-14 13:41:01 -0700 |
commit | bc44a763808241ec4e732e7b3bbd819490c88cbb (patch) | |
tree | a55f7c670dbd051981ce65c37da5320a5e980990 /src/leap/util/coroutines.py | |
parent | e1103904fbdd9b54b53075956c279271c17e9a8f (diff) | |
parent | b0ef9f98d8384cb68e59fac91142e5ac9f2ab47c (diff) |
Merge branch 'develop' of ssh://leap.se:4422/leap-client into refactor
Diffstat (limited to 'src/leap/util/coroutines.py')
-rw-r--r-- | src/leap/util/coroutines.py | 107 |
1 files changed, 107 insertions, 0 deletions
diff --git a/src/leap/util/coroutines.py b/src/leap/util/coroutines.py new file mode 100644 index 00000000..e7ccfacf --- /dev/null +++ b/src/leap/util/coroutines.py @@ -0,0 +1,107 @@ +# the problem of watching a stdout pipe from +# openvpn binary: using subprocess and coroutines +# acting as event consumers + +from __future__ import division, print_function + +from subprocess import PIPE, Popen +import sys +from threading import Thread + +ON_POSIX = 'posix' in sys.builtin_module_names + + +# +# Coroutines goodies +# + +def coroutine(func): + def start(*args, **kwargs): + cr = func(*args, **kwargs) + cr.next() + return cr + return start + + +@coroutine +def process_events(callback): + """ + coroutine loop that receives + events sent and dispatch the callback. + :param callback: callback to be called\ +for each event + :type callback: callable + """ + try: + while True: + m = (yield) + if callable(callback): + callback(m) + else: + #XXX log instead + print('not a callable passed') + except GeneratorExit: + return + +# +# Threads +# + + +def launch_thread(target, args): + """ + launch and demonize thread. + :param target: target function that will run in thread + :type target: function + :param args: args to be passed to thread + :type args: list + """ + t = Thread(target=target, + args=args) + t.daemon = True + t.start() + return t + + +def watch_output(out, observers): + """ + initializes dict of observer coroutines + and pushes lines to each of them as they are received + from the watched output. + :param out: stdout of a process. + :type out: fd + :param observers: tuple of coroutines to send data\ +for each event + :type ovservers: tuple + """ + observer_dict = dict(((observer, process_events(observer)) + for observer in observers)) + for line in iter(out.readline, b''): + for obs in observer_dict: + observer_dict[obs].send(line) + out.close() + + +def spawn_and_watch_process(command, args, observers=None): + """ + spawns a subprocess with command, args, and launch + a watcher thread. + :param command: command to be executed in the subprocess + :type command: str + :param args: arguments + :type args: list + :param observers: tuple of observer functions to be called \ +for each line in the subprocess output. + :type observers: tuple + :return: a tuple containing the child process instance, and watcher_thread, + :rtype: (Subprocess, Thread) + """ + subp = Popen([command] + args, + stdout=PIPE, + stderr=PIPE, + bufsize=1, + close_fds=ON_POSIX) + watcher = launch_thread( + watch_output, + (subp.stdout, observers)) + return subp, watcher |