summaryrefslogtreecommitdiff
path: root/src/leap/utils/coroutines.py
diff options
context:
space:
mode:
authorkali <kali@leap.se>2012-07-22 21:10:15 -0700
committerkali <kali@leap.se>2012-07-22 21:10:15 -0700
commitc46d8da153ac658c8bd145376e22b1218db1090a (patch)
tree0943a4a866d9f3b1bc590c1c23f810ca13635f9e /src/leap/utils/coroutines.py
initial import
Diffstat (limited to 'src/leap/utils/coroutines.py')
-rw-r--r--src/leap/utils/coroutines.py107
1 files changed, 107 insertions, 0 deletions
diff --git a/src/leap/utils/coroutines.py b/src/leap/utils/coroutines.py
new file mode 100644
index 00000000..5e25eb63
--- /dev/null
+++ b/src/leap/utils/coroutines.py
@@ -0,0 +1,107 @@
+# the problem of watching a stdout pipe from
+# openvpn binary: using subprocess and coroutines
+# acting as event consumers
+
+from __future__ import division, print_function
+
+from subprocess import PIPE, Popen
+import sys
+from threading import Thread
+
+ON_POSIX = 'posix' in sys.builtin_module_names
+
+
+#
+# Coroutines goodies
+#
+
+def coroutine(func):
+ def start(*args, **kwargs):
+ cr = func(*args, **kwargs)
+ cr.next()
+ return cr
+ return start
+
+
+@coroutine
+def process_events(callback):
+ """
+ coroutine loop that receives
+ events sent and dispatch the callback.
+ :param callback: callback to be called\
+for each event
+ :type callback: callable
+ """
+ try:
+ while True:
+ m = (yield)
+ if callable(callback):
+ callback(m)
+ else:
+ #XXX log instead
+ print('not a callable passed')
+ except GeneratorExit:
+ return
+
+#
+# Threads
+#
+
+
+def launch_thread(target, args):
+ """
+ launch and demonize thread.
+ :param target: target function that will run in thread
+ :type target: function
+ :param args: args to be passed to thread
+ :type args: list
+ """
+ t = Thread(target=target,
+ args=args)
+ t.daemon = True
+ t.start()
+ return t
+
+
+def watch_output(out, observers):
+ """
+ initializes dict of observer coroutines
+ and pushes lines to each of them as they are received
+ from the watched output.
+ :param out: stdout of a process.
+ :type out: fd
+ :param observers: tuple of coroutines to send data\
+for each event
+ :type ovservers: tuple
+ """
+ observer_dict = {observer: process_events(observer)
+ for observer in observers}
+ for line in iter(out.readline, b''):
+ for obs in observer_dict:
+ observer_dict[obs].send(line)
+ out.close()
+
+
+def spawn_and_watch_process(command, args, observers=None):
+ """
+ spawns a subprocess with command, args, and launch
+ a watcher thread.
+ :param command: command to be executed in the subprocess
+ :type command: str
+ :param args: arguments
+ :type args: list
+ :param observers: tuple of observer functions to be called \
+for each line in the subprocess output.
+ :type observers: tuple
+ :return: a tuple containing the child process instance, and watcher_thread,
+ :rtype: (Subprocess, Thread)
+ """
+ subp = Popen([command] + args,
+ stdout=PIPE,
+ stderr=PIPE,
+ bufsize=1,
+ close_fds=ON_POSIX)
+ watcher = launch_thread(
+ watch_output,
+ (subp.stdout, observers))
+ return subp, watcher