From 6e197c1353c788109df07ee6d1242a5c2327e8f9 Mon Sep 17 00:00:00 2001 From: kali Date: Wed, 1 Aug 2012 09:58:08 +0900 Subject: fileutil.which implementation --- src/leap/util/coroutines.py | 107 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 107 insertions(+) create mode 100644 src/leap/util/coroutines.py (limited to 'src/leap/util/coroutines.py') diff --git a/src/leap/util/coroutines.py b/src/leap/util/coroutines.py new file mode 100644 index 00000000..5e25eb63 --- /dev/null +++ b/src/leap/util/coroutines.py @@ -0,0 +1,107 @@ +# the problem of watching a stdout pipe from +# openvpn binary: using subprocess and coroutines +# acting as event consumers + +from __future__ import division, print_function + +from subprocess import PIPE, Popen +import sys +from threading import Thread + +ON_POSIX = 'posix' in sys.builtin_module_names + + +# +# Coroutines goodies +# + +def coroutine(func): + def start(*args, **kwargs): + cr = func(*args, **kwargs) + cr.next() + return cr + return start + + +@coroutine +def process_events(callback): + """ + coroutine loop that receives + events sent and dispatch the callback. + :param callback: callback to be called\ +for each event + :type callback: callable + """ + try: + while True: + m = (yield) + if callable(callback): + callback(m) + else: + #XXX log instead + print('not a callable passed') + except GeneratorExit: + return + +# +# Threads +# + + +def launch_thread(target, args): + """ + launch and demonize thread. + :param target: target function that will run in thread + :type target: function + :param args: args to be passed to thread + :type args: list + """ + t = Thread(target=target, + args=args) + t.daemon = True + t.start() + return t + + +def watch_output(out, observers): + """ + initializes dict of observer coroutines + and pushes lines to each of them as they are received + from the watched output. + :param out: stdout of a process. + :type out: fd + :param observers: tuple of coroutines to send data\ +for each event + :type ovservers: tuple + """ + observer_dict = {observer: process_events(observer) + for observer in observers} + for line in iter(out.readline, b''): + for obs in observer_dict: + observer_dict[obs].send(line) + out.close() + + +def spawn_and_watch_process(command, args, observers=None): + """ + spawns a subprocess with command, args, and launch + a watcher thread. + :param command: command to be executed in the subprocess + :type command: str + :param args: arguments + :type args: list + :param observers: tuple of observer functions to be called \ +for each line in the subprocess output. + :type observers: tuple + :return: a tuple containing the child process instance, and watcher_thread, + :rtype: (Subprocess, Thread) + """ + subp = Popen([command] + args, + stdout=PIPE, + stderr=PIPE, + bufsize=1, + close_fds=ON_POSIX) + watcher = launch_thread( + watch_output, + (subp.stdout, observers)) + return subp, watcher -- cgit v1.2.3 From 021e7ea900c64af9577412f11349f69e01634c0c Mon Sep 17 00:00:00 2001 From: antialias Date: Tue, 25 Sep 2012 15:48:40 -0400 Subject: fixed typo. --- src/leap/util/coroutines.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/leap/util/coroutines.py') diff --git a/src/leap/util/coroutines.py b/src/leap/util/coroutines.py index e7ccfacf..b9d0a98b 100644 --- a/src/leap/util/coroutines.py +++ b/src/leap/util/coroutines.py @@ -72,7 +72,7 @@ def watch_output(out, observers): :type out: fd :param observers: tuple of coroutines to send data\ for each event - :type ovservers: tuple + :type observers: tuple """ observer_dict = dict(((observer, process_events(observer)) for observer in observers)) -- cgit v1.2.3 From 6728eb9afb21bad867e4052a6190a9bdb34c928a Mon Sep 17 00:00:00 2001 From: kali Date: Mon, 8 Oct 2012 07:50:24 +0900 Subject: popup dialog error when network error happens we are shutting down for now. we should be acting upon failures in the near future. lowered the recurrent checks interval to 10 seconds. --- src/leap/util/coroutines.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'src/leap/util/coroutines.py') diff --git a/src/leap/util/coroutines.py b/src/leap/util/coroutines.py index b9d0a98b..0657fc04 100644 --- a/src/leap/util/coroutines.py +++ b/src/leap/util/coroutines.py @@ -4,10 +4,13 @@ from __future__ import division, print_function +import logging from subprocess import PIPE, Popen import sys from threading import Thread +logger = logging.getLogger(__name__) + ON_POSIX = 'posix' in sys.builtin_module_names @@ -38,8 +41,7 @@ for each event if callable(callback): callback(m) else: - #XXX log instead - print('not a callable passed') + logger.debug('not a callable passed') except GeneratorExit: return -- cgit v1.2.3