From 4c5f597132a0db4418b18b42385dde7f860afcb0 Mon Sep 17 00:00:00 2001 From: Ben Carrillo Date: Tue, 5 Feb 2013 23:00:53 +0900 Subject: initial commit --- test.py | 1312 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1312 insertions(+) create mode 100644 test.py (limited to 'test.py') diff --git a/test.py b/test.py new file mode 100644 index 0000000..efa84d4 --- /dev/null +++ b/test.py @@ -0,0 +1,1312 @@ +# -*- coding: utf8 -*- + +import os +import unittest +import tempfile +import sys +import sh +import platform + +IS_OSX = platform.system() == "Darwin" +IS_PY3 = sys.version_info[0] == 3 +if IS_PY3: + unicode = str + python = sh.Command(sh.which("python%d.%d" % sys.version_info[:2])) +else: + from sh import python + + +THIS_DIR = os.path.dirname(os.path.abspath(__file__)) + +skipUnless = getattr(unittest, "skipUnless", None) +if not skipUnless: + def skipUnless(*args, **kwargs): + def wrapper(thing): return thing + return wrapper + +requires_posix = skipUnless(os.name == "posix", "Requires POSIX") + + + +def create_tmp_test(code): + py = tempfile.NamedTemporaryFile() + if IS_PY3: code = bytes(code, "UTF-8") + py.write(code) + py.flush() + # we don't explicitly close, because close will remove the file, and we + # don't want that until the test case is done. so we let the gc close it + # when it goes out of scope + return py + + + +@requires_posix +class Basic(unittest.TestCase): + + def test_print_command(self): + from sh import ls, which + actual_location = which("ls") + out = str(ls) + self.assertEqual(out, actual_location) + + def test_unicode_arg(self): + from sh import echo + + test = "漢字" + if not IS_PY3: test = test.decode("utf8") + + p = echo(test).strip() + self.assertEqual(test, p) + + def test_number_arg(self): + py = create_tmp_test(""" +from optparse import OptionParser +parser = OptionParser() +options, args = parser.parse_args() +print(args[0]) +""") + + out = python(py.name, 3).strip() + self.assertEqual(out, "3") + + def test_exit_code(self): + from sh import ls, ErrorReturnCode + self.assertEqual(ls("/").exit_code, 0) + self.assertRaises(ErrorReturnCode, ls, "/aofwje/garogjao4a/eoan3on") + + def test_glob_warning(self): + from sh import ls + from glob import glob + import warnings + + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + + ls(glob("ofjaoweijfaowe")) + + self.assertTrue(len(w) == 1) + self.assertTrue(issubclass(w[-1].category, UserWarning)) + self.assertTrue("glob" in str(w[-1].message)) + + def test_stdin_from_string(self): + from sh import sed + self.assertEqual(sed(_in="test", e="s/test/lol/").strip(), "lol") + + def test_ok_code(self): + from sh import ls, ErrorReturnCode_1, ErrorReturnCode_2 + + exc_to_test = ErrorReturnCode_2 + code_to_pass = 2 + if IS_OSX: + exc_to_test = ErrorReturnCode_1 + code_to_pass = 1 + self.assertRaises(exc_to_test, ls, "/aofwje/garogjao4a/eoan3on") + + ls("/aofwje/garogjao4a/eoan3on", _ok_code=code_to_pass) + ls("/aofwje/garogjao4a/eoan3on", _ok_code=[code_to_pass]) + + def test_quote_escaping(self): + py = create_tmp_test(""" +from optparse import OptionParser +parser = OptionParser() +options, args = parser.parse_args() +print(args) +""") + out = python(py.name, "one two three").strip() + self.assertEqual(out, "['one two three']") + + out = python(py.name, "one \"two three").strip() + self.assertEqual(out, "['one \"two three']") + + out = python(py.name, "one", "two three").strip() + self.assertEqual(out, "['one', 'two three']") + + out = python(py.name, "one", "two \"haha\" three").strip() + self.assertEqual(out, "['one', 'two \"haha\" three']") + + out = python(py.name, "one two's three").strip() + self.assertEqual(out, "[\"one two's three\"]") + + out = python(py.name, 'one two\'s three').strip() + self.assertEqual(out, "[\"one two's three\"]") + + def test_multiple_pipes(self): + from sh import tr, python + import time + + py = create_tmp_test(""" +import sys +import os +import time + +for l in "andrew": + print(l) + time.sleep(.2) +""") + + class Derp(object): + def __init__(self): + self.times = [] + self.stdout = [] + self.last_received = None + + def agg(self, line): + self.stdout.append(line.strip()) + now = time.time() + if self.last_received: self.times.append(now - self.last_received) + self.last_received = now + + derp = Derp() + + p = tr( + tr( + tr( + python(py.name, _piped=True), + "aw", "wa", _piped=True), + "ne", "en", _piped=True), + "dr", "rd", _out=derp.agg) + + p.wait() + self.assertEqual("".join(derp.stdout), "werdna") + self.assertTrue(all([t > .15 for t in derp.times])) + + + def test_manual_stdin_string(self): + from sh import tr + + out = tr("[:lower:]", "[:upper:]", _in="andrew").strip() + self.assertEqual(out, "ANDREW") + + + def test_manual_stdin_iterable(self): + from sh import tr + + test = ["testing\n", "herp\n", "derp\n"] + out = tr("[:lower:]", "[:upper:]", _in=test) + + match = "".join([t.upper() for t in test]) + self.assertEqual(out, match) + + + def test_manual_stdin_file(self): + from sh import tr + import tempfile + + test_string = "testing\nherp\nderp\n" + + stdin = tempfile.NamedTemporaryFile() + stdin.write(test_string.encode()) + stdin.flush() + stdin.seek(0) + + out = tr("[:lower:]", "[:upper:]", _in=stdin) + + self.assertEqual(out, test_string.upper()) + + + def test_manual_stdin_queue(self): + from sh import tr + try: from Queue import Queue, Empty + except ImportError: from queue import Queue, Empty + + test = ["testing\n", "herp\n", "derp\n"] + + q = Queue() + for t in test: q.put(t) + q.put(None) # EOF + + out = tr("[:lower:]", "[:upper:]", _in=q) + + match = "".join([t.upper() for t in test]) + self.assertEqual(out, match) + + + def test_environment(self): + import os + + env = {"HERP": "DERP"} + + py = create_tmp_test(""" +import os + +osx_cruft = ["__CF_USER_TEXT_ENCODING", "__PYVENV_LAUNCHER__"] +for key in osx_cruft: + try: del os.environ[key] + except: pass +print(os.environ["HERP"] + " " + str(len(os.environ))) +""") + out = python(py.name, _env=env).strip() + self.assertEqual(out, "DERP 1") + + py = create_tmp_test(""" +import os, sys +sys.path.insert(0, os.getcwd()) +import sh +osx_cruft = ["__CF_USER_TEXT_ENCODING", "__PYVENV_LAUNCHER__"] +for key in osx_cruft: + try: del os.environ[key] + except: pass +print(sh.HERP + " " + str(len(os.environ))) +""") + out = python(py.name, _env=env, _cwd=THIS_DIR).strip() + self.assertEqual(out, "DERP 1") + + + def test_which(self): + from sh import which, ls + self.assertEqual(which("fjoawjefojawe"), None) + self.assertEqual(which("ls"), str(ls)) + + + def test_foreground(self): + return + raise NotImplementedError + + def test_no_arg(self): + import pwd + from sh import whoami + u1 = whoami().strip() + u2 = pwd.getpwuid(os.geteuid())[0] + self.assertEqual(u1, u2) + + def test_incompatible_special_args(self): + from sh import ls + self.assertRaises(TypeError, ls, _iter=True, _piped=True) + + + def test_exception(self): + from sh import ls, ErrorReturnCode_1, ErrorReturnCode_2 + + exc_to_test = ErrorReturnCode_2 + if IS_OSX: exc_to_test = ErrorReturnCode_1 + self.assertRaises(exc_to_test, ls, "/aofwje/garogjao4a/eoan3on") + + + def test_command_not_found(self): + from sh import CommandNotFound + + def do_import(): from sh import aowjgoawjoeijaowjellll + self.assertRaises(ImportError, do_import) + + def do_import(): + import sh + sh.awoefaowejfw + self.assertRaises(CommandNotFound, do_import) + + + def test_command_wrapper_equivalence(self): + from sh import Command, ls, which + + self.assertEqual(Command(which("ls")), ls) + + + def test_multiple_args_short_option(self): + py = create_tmp_test(""" +from optparse import OptionParser +parser = OptionParser() +parser.add_option("-l", dest="long_option") +options, args = parser.parse_args() +print(len(options.long_option.split())) +""") + num_args = int(python(py.name, l="one two three")) + self.assertEqual(num_args, 3) + + num_args = int(python(py.name, "-l", "one's two's three's")) + self.assertEqual(num_args, 3) + + + def test_multiple_args_long_option(self): + py = create_tmp_test(""" +from optparse import OptionParser +parser = OptionParser() +parser.add_option("-l", "--long-option", dest="long_option") +options, args = parser.parse_args() +print(len(options.long_option.split())) +""") + num_args = int(python(py.name, long_option="one two three")) + self.assertEqual(num_args, 3) + + num_args = int(python(py.name, "--long-option", "one's two's three's")) + self.assertEqual(num_args, 3) + + + def test_short_bool_option(self): + py = create_tmp_test(""" +from optparse import OptionParser +parser = OptionParser() +parser.add_option("-s", action="store_true", default=False, dest="short_option") +options, args = parser.parse_args() +print(options.short_option) +""") + self.assertTrue(python(py.name, s=True).strip() == "True") + self.assertTrue(python(py.name, s=False).strip() == "False") + self.assertTrue(python(py.name).strip() == "False") + + + def test_long_bool_option(self): + py = create_tmp_test(""" +from optparse import OptionParser +parser = OptionParser() +parser.add_option("-l", "--long-option", action="store_true", default=False, dest="long_option") +options, args = parser.parse_args() +print(options.long_option) +""") + self.assertTrue(python(py.name, long_option=True).strip() == "True") + self.assertTrue(python(py.name).strip() == "False") + + + def test_composition(self): + from sh import ls, wc + c1 = int(wc(ls("-A1"), l=True)) + c2 = len(os.listdir(".")) + self.assertEqual(c1, c2) + + def test_incremental_composition(self): + from sh import ls, wc + c1 = int(wc(ls("-A1", _piped=True), l=True).strip()) + c2 = len(os.listdir(".")) + if c1 != c2: + with open("/tmp/fail", "a") as h: h.write("FUCK\n") + self.assertEqual(c1, c2) + + + def test_short_option(self): + from sh import sh + s1 = sh(c="echo test").strip() + s2 = "test" + self.assertEqual(s1, s2) + + + def test_long_option(self): + py = create_tmp_test(""" +from optparse import OptionParser +parser = OptionParser() +parser.add_option("-l", "--long-option", action="store", default="", dest="long_option") +options, args = parser.parse_args() +print(options.long_option.upper()) +""") + self.assertTrue(python(py.name, long_option="testing").strip() == "TESTING") + self.assertTrue(python(py.name).strip() == "") + + + def test_command_wrapper(self): + from sh import Command, which + + ls = Command(which("ls")) + wc = Command(which("wc")) + + c1 = int(wc(ls("-A1"), l=True)) + c2 = len(os.listdir(".")) + + self.assertEqual(c1, c2) + + + def test_background(self): + from sh import sleep + import time + + start = time.time() + sleep_time = .5 + p = sleep(sleep_time, _bg=True) + + now = time.time() + self.assertTrue(now - start < sleep_time) + + p.wait() + now = time.time() + self.assertTrue(now - start > sleep_time) + + + def test_background_exception(self): + from sh import ls, ErrorReturnCode_1, ErrorReturnCode_2 + p = ls("/ofawjeofj", _bg=True) # should not raise + + exc_to_test = ErrorReturnCode_2 + if IS_OSX: exc_to_test = ErrorReturnCode_1 + self.assertRaises(exc_to_test, p.wait) # should raise + + + def test_with_context(self): + from sh import whoami + import getpass + + py = create_tmp_test(""" +import sys +import os +import subprocess + +print("with_context") +subprocess.Popen(sys.argv[1:], shell=False).wait() +""") + + cmd1 = python.bake(py.name, _with=True) + with cmd1: + out = whoami() + self.assertTrue("with_context" in out) + self.assertTrue(getpass.getuser() in out) + + + def test_with_context_args(self): + from sh import whoami + import getpass + + py = create_tmp_test(""" +import sys +import os +import subprocess +from optparse import OptionParser + +parser = OptionParser() +parser.add_option("-o", "--opt", action="store_true", default=False, dest="opt") +options, args = parser.parse_args() + +if options.opt: + subprocess.Popen(args[0], shell=False).wait() +""") + with python(py.name, opt=True, _with=True): + out = whoami() + self.assertTrue(getpass.getuser() == out.strip()) + + + with python(py.name, _with=True): + out = whoami() + self.assertTrue(out == "") + + + + def test_err_to_out(self): + py = create_tmp_test(""" +import sys +import os + +sys.stdout.write("stdout") +sys.stdout.flush() +sys.stderr.write("stderr") +sys.stderr.flush() +""") + stdout = python(py.name, _err_to_out=True) + self.assertTrue(stdout == "stdoutstderr") + + + + def test_out_redirection(self): + import tempfile + + py = create_tmp_test(""" +import sys +import os + +sys.stdout.write("stdout") +sys.stderr.write("stderr") +""") + + file_obj = tempfile.TemporaryFile() + out = python(py.name, _out=file_obj) + + self.assertTrue(len(out) == 0) + + file_obj.seek(0) + actual_out = file_obj.read() + file_obj.close() + + self.assertTrue(len(actual_out) != 0) + + + # test with tee + file_obj = tempfile.TemporaryFile() + out = python(py.name, _out=file_obj, _tee=True) + + self.assertTrue(len(out) != 0) + + file_obj.seek(0) + actual_out = file_obj.read() + file_obj.close() + + self.assertTrue(len(actual_out) != 0) + + + + def test_err_redirection(self): + import tempfile + + py = create_tmp_test(""" +import sys +import os + +sys.stdout.write("stdout") +sys.stderr.write("stderr") +""") + file_obj = tempfile.TemporaryFile() + p = python(py.name, _err=file_obj) + + file_obj.seek(0) + stderr = file_obj.read().decode() + file_obj.close() + + self.assertTrue(p.stdout == b"stdout") + self.assertTrue(stderr == "stderr") + self.assertTrue(len(p.stderr) == 0) + + # now with tee + file_obj = tempfile.TemporaryFile() + p = python(py.name, _err=file_obj, _tee="err") + + file_obj.seek(0) + stderr = file_obj.read().decode() + file_obj.close() + + self.assertTrue(p.stdout == b"stdout") + self.assertTrue(stderr == "stderr") + self.assertTrue(len(p.stderr) != 0) + + + + def test_err_redirection_actual_file(self): + import tempfile + file_obj = tempfile.NamedTemporaryFile() + py = create_tmp_test(""" +import sys +import os + +sys.stdout.write("stdout") +sys.stderr.write("stderr") +""") + stdout = python(py.name, _err=file_obj.name, u=True).wait() + file_obj.seek(0) + stderr = file_obj.read().decode() + file_obj.close() + self.assertTrue(stdout == "stdout") + self.assertTrue(stderr == "stderr") + + def test_subcommand_and_bake(self): + from sh import ls + import getpass + + py = create_tmp_test(""" +import sys +import os +import subprocess + +print("subcommand") +subprocess.Popen(sys.argv[1:], shell=False).wait() +""") + + cmd1 = python.bake(py.name) + out = cmd1.whoami() + self.assertTrue("subcommand" in out) + self.assertTrue(getpass.getuser() in out) + + + def test_multiple_bakes(self): + from sh import whoami + import getpass + + py = create_tmp_test(""" +import sys +import subprocess +subprocess.Popen(sys.argv[1:], shell=False).wait() +""") + + out = python.bake(py.name).bake("whoami")() + self.assertTrue(getpass.getuser() == out.strip()) + + + + def test_bake_args_come_first(self): + from sh import ls + ls = ls.bake(h=True) + + ran = ls("-la").ran + ft = ran.index("-h") + self.assertTrue("-la" in ran[ft:]) + + def test_output_equivalence(self): + from sh import whoami + + iam1 = whoami() + iam2 = whoami() + + self.assertEqual(iam1, iam2) + + + def test_stdout_callback(self): + py = create_tmp_test(""" +import sys +import os + +for i in range(5): print(i) +""") + stdout = [] + def agg(line): + stdout.append(line) + + p = python(py.name, _out=agg, u=True) + p.wait() + + self.assertTrue(len(stdout) == 5) + + + + def test_stdout_callback_no_wait(self): + import time + + py = create_tmp_test(""" +import sys +import os +import time + +for i in range(5): + print(i) + time.sleep(.5) +""") + + stdout = [] + def agg(line): stdout.append(line) + + p = python(py.name, _out=agg, u=True) + + # we give a little pause to make sure that the NamedTemporaryFile + # exists when the python process actually starts + time.sleep(.5) + + self.assertTrue(len(stdout) != 5) + + + + def test_stdout_callback_line_buffered(self): + py = create_tmp_test(""" +import sys +import os + +for i in range(5): print("herpderp") +""") + + stdout = [] + def agg(line): stdout.append(line) + + p = python(py.name, _out=agg, _out_bufsize=1, u=True) + p.wait() + + self.assertTrue(len(stdout) == 5) + + + + def test_stdout_callback_line_unbuffered(self): + py = create_tmp_test(""" +import sys +import os + +for i in range(5): print("herpderp") +""") + + stdout = [] + def agg(char): stdout.append(char) + + p = python(py.name, _out=agg, _out_bufsize=0, u=True) + p.wait() + + # + 5 newlines + self.assertTrue(len(stdout) == (len("herpderp") * 5 + 5)) + + + def test_stdout_callback_buffered(self): + py = create_tmp_test(""" +import sys +import os + +for i in range(5): sys.stdout.write("herpderp") +""") + + stdout = [] + def agg(chunk): stdout.append(chunk) + + p = python(py.name, _out=agg, _out_bufsize=4, u=True) + p.wait() + + self.assertTrue(len(stdout) == (len("herp")/2 * 5)) + + + + def test_stdout_callback_with_input(self): + py = create_tmp_test(""" +import sys +import os +IS_PY3 = sys.version_info[0] == 3 +if IS_PY3: raw_input = input + +for i in range(5): print(str(i)) +derp = raw_input("herp? ") +print(derp) +""") + + def agg(line, stdin): + if line.strip() == "4": stdin.put("derp\n") + + p = python(py.name, _out=agg, u=True, _tee=True) + p.wait() + + self.assertTrue("derp" in p) + + + + def test_stdout_callback_exit(self): + py = create_tmp_test(""" +import sys +import os + +for i in range(5): print(i) +""") + + stdout = [] + def agg(line): + line = line.strip() + stdout.append(line) + if line == "2": return True + + p = python(py.name, _out=agg, u=True, _tee=True) + p.wait() + + self.assertTrue("4" in p) + self.assertTrue("4" not in stdout) + + + + def test_stdout_callback_terminate(self): + import signal + py = create_tmp_test(""" +import sys +import os +import time + +for i in range(5): + print(i) + time.sleep(.5) +""") + + stdout = [] + def agg(line, stdin, process): + line = line.strip() + stdout.append(line) + if line == "3": + process.terminate() + return True + + p = python(py.name, _out=agg, u=True) + p.wait() + + self.assertEqual(p.process.exit_code, -signal.SIGTERM) + self.assertTrue("4" not in p) + self.assertTrue("4" not in stdout) + + + + def test_stdout_callback_kill(self): + import signal + + py = create_tmp_test(""" +import sys +import os +import time + +for i in range(5): + print(i) + time.sleep(.5) +""") + + stdout = [] + def agg(line, stdin, process): + line = line.strip() + stdout.append(line) + if line == "3": + process.kill() + return True + + p = python(py.name, _out=agg, u=True) + p.wait() + + self.assertEqual(p.process.exit_code, -signal.SIGKILL) + self.assertTrue("4" not in p) + self.assertTrue("4" not in stdout) + + def test_general_signal(self): + import signal + from signal import SIGINT + + py = create_tmp_test(""" +import sys +import os +import time +import signal + +def sig_handler(sig, frame): + print(10) + exit(0) + +signal.signal(signal.SIGINT, sig_handler) + +for i in range(5): + print(i) + sys.stdout.flush() + time.sleep(0.5) +""") + + stdout = [] + def agg(line, stdin, process): + line = line.strip() + stdout.append(line) + if line == "3": + process.signal(SIGINT) + return True + + p = python(py.name, _out=agg, _tee=True) + p.wait() + + self.assertEqual(p.process.exit_code, 0) + self.assertEqual(p, "0\n1\n2\n3\n10\n") + + + def test_iter_generator(self): + py = create_tmp_test(""" +import sys +import os +import time + +for i in range(42): + print(i) + sys.stdout.flush() +""") + + out = [] + for line in python(py.name, _iter=True): + out.append(int(line.strip())) + self.assertTrue(len(out) == 42 and sum(out) == 861) + + + def test_nonblocking_iter(self): + import tempfile + from sh import tail + from errno import EWOULDBLOCK + + tmp = tempfile.NamedTemporaryFile() + for line in tail("-f", tmp.name, _iter_noblock=True): break + self.assertEqual(line, EWOULDBLOCK) + + + def test_for_generator_to_err(self): + py = create_tmp_test(""" +import sys +import os + +for i in range(42): + sys.stderr.write(str(i)+"\\n") +""") + + out = [] + for line in python(py.name, _iter="err", u=True): out.append(line) + self.assertTrue(len(out) == 42) + + # verify that nothing is going to stdout + out = [] + for line in python(py.name, _iter="out", u=True): out.append(line) + self.assertTrue(len(out) == 0) + + + + def test_piped_generator(self): + from sh import tr + from string import ascii_uppercase + import time + + py1 = create_tmp_test(""" +import sys +import os +import time + +for letter in "andrew": + time.sleep(0.5) + print(letter) + """) + + py2 = create_tmp_test(""" +import sys +import os +import time + +while True: + line = sys.stdin.readline() + if not line: break + print(line.strip().upper()) + """) + + + times = [] + last_received = None + + letters = "" + for line in python(python(py1.name, _piped="out", u=True), py2.name, _iter=True, u=True): + if not letters: start = time.time() + letters += line.strip() + + now = time.time() + if last_received: times.append(now - last_received) + last_received = now + + self.assertEqual("ANDREW", letters) + self.assertTrue(all([t > .3 for t in times])) + + + def test_generator_and_callback(self): + py = create_tmp_test(""" +import sys +import os + +for i in range(42): + sys.stderr.write(str(i * 2)+"\\n") + print(i) +""") + + stderr = [] + def agg(line): stderr.append(int(line.strip())) + + out = [] + for line in python(py.name, _iter=True, _err=agg, u=True): out.append(line) + + self.assertTrue(len(out) == 42) + self.assertTrue(sum(stderr) == 1722) + + + def test_bg_to_int(self): + from sh import echo + # bugs with background might cause the following error: + # ValueError: invalid literal for int() with base 10: '' + self.assertEqual(int(echo("123", _bg=True)), 123) + + + def test_cwd(self): + from sh import pwd + from os.path import realpath + self.assertEqual(str(pwd(_cwd="/tmp")), realpath("/tmp")+"\n") + self.assertEqual(str(pwd(_cwd="/etc")), realpath("/etc")+"\n") + + + def test_huge_piped_data(self): + from sh import tr + + stdin = tempfile.NamedTemporaryFile() + + data = "herpderp" * 4000 + "\n" + stdin.write(data.encode()) + stdin.flush() + stdin.seek(0) + + out = tr(tr("[:lower:]", "[:upper:]", _in=data), "[:upper:]", "[:lower:]") + self.assertTrue(out == data) + + + def test_tty_input(self): + py = create_tmp_test(""" +import sys +import os + +if os.isatty(sys.stdin.fileno()): + sys.stdout.write("password?\\n") + sys.stdout.flush() + pw = sys.stdin.readline().strip() + sys.stdout.write("%s\\n" % ("*" * len(pw))) + sys.stdout.flush() +else: + sys.stdout.write("no tty attached!\\n") + sys.stdout.flush() +""") + + test_pw = "test123" + expected_stars = "*" * len(test_pw) + d = {} + + def password_enterer(line, stdin): + line = line.strip() + if not line: return + + if line == "password?": + stdin.put(test_pw+"\n") + + elif line.startswith("*"): + d["stars"] = line + return True + + pw_stars = python(py.name, _tty_in=True, _out=password_enterer) + pw_stars.wait() + self.assertEqual(d["stars"], expected_stars) + + response = python(py.name) + self.assertEqual(response, "no tty attached!\n") + + + def test_stringio_output(self): + from sh import echo + if IS_PY3: + from io import StringIO + from io import BytesIO as cStringIO + else: + from StringIO import StringIO + from cStringIO import StringIO as cStringIO + + out = StringIO() + echo("-n", "testing 123", _out=out) + self.assertEqual(out.getvalue(), "testing 123") + + out = cStringIO() + echo("-n", "testing 123", _out=out) + self.assertEqual(out.getvalue().decode(), "testing 123") + + + def test_stringio_input(self): + from sh import cat + + if IS_PY3: + from io import StringIO + from io import BytesIO as cStringIO + else: + from StringIO import StringIO + from cStringIO import StringIO as cStringIO + + input = StringIO() + input.write("herpderp") + input.seek(0) + + out = cat(_in=input) + self.assertEqual(out, "herpderp") + + + def test_internal_bufsize(self): + from sh import cat + + output = cat(_in="a"*1000, _internal_bufsize=100, _out_bufsize=0) + self.assertEqual(len(output), 100) + + output = cat(_in="a"*1000, _internal_bufsize=50, _out_bufsize=2) + self.assertEqual(len(output), 100) + + + def test_change_stdout_buffering(self): + py = create_tmp_test(""" +import sys +import os + +# this proves that we won't get the output into our callback until we send +# a newline +sys.stdout.write("switch ") +sys.stdout.flush() +sys.stdout.write("buffering\\n") +sys.stdout.flush() + +sys.stdin.read(1) +sys.stdout.write("unbuffered") +sys.stdout.flush() + +# this is to keep the output from being flushed by the process ending, which +# would ruin our test. we want to make sure we get the string "unbuffered" +# before the process ends, without writing a newline +sys.stdin.read(1) +""") + + d = {"success": False} + def interact(line, stdin, process): + line = line.strip() + if not line: return + + if line == "switch buffering": + process.out_bufsize(0) + stdin.put("a") + + elif line == "unbuffered": + stdin.put("b") + d["success"] = True + return True + + # start with line buffered stdout + pw_stars = python(py.name, _out=interact, _out_bufsize=1, u=True) + pw_stars.wait() + + self.assertTrue(d["success"]) + + + + def test_encoding(self): + return + raise NotImplementedError("what's the best way to test a different \ +'_encoding' special keyword argument?") + + + def test_timeout(self): + from sh import sleep + from time import time + + # check that a normal sleep is more or less how long the whole process + # takes + sleep_for = 3 + started = time() + sh.sleep(sleep_for).wait() + elapsed = time() - started + + self.assertTrue(abs(elapsed - sleep_for) < 0.1) + + # now make sure that killing early makes the process take less time + sleep_for = 3 + timeout = 1 + started = time() + sh.sleep(sleep_for, _timeout=timeout).wait() + elapsed = time() - started + self.assertTrue(abs(elapsed - timeout) < 0.1) + + + def test_binary_pipe(self): + binary = b'\xec;\xedr\xdbF\x92\xf9\x8d\xa7\x98\x02/\x15\xd2K\xc3\x94d\xc9' + + py1 = create_tmp_test(""" +import sys +import os + +sys.stdout = os.fdopen(sys.stdout.fileno(), "wb", 0) +sys.stdout.write(%r) +""" % binary) + + py2 = create_tmp_test(""" +import sys +import os + +sys.stdin = os.fdopen(sys.stdin.fileno(), "rb", 0) +sys.stdout = os.fdopen(sys.stdout.fileno(), "wb", 0) +sys.stdout.write(sys.stdin.read()) +""") + out = python(python(py1.name), py2.name) + self.assertEqual(out.stdout, binary) + + + def test_auto_change_buffering(self): + binary = b'\xec;\xedr\xdbF\x92\xf9\x8d\xa7\x98\x02/\x15\xd2K\xc3\x94d\xc9' + py1 = create_tmp_test(""" +import sys +import os +import time + +sys.stdout = os.fdopen(sys.stdout.fileno(), "wb", 0) +sys.stdout.write(b"testing") +sys.stdout.flush() +# to ensure that sh's select loop picks up the write before we write again +time.sleep(0.5) +sys.stdout.write(b"again\\n") +sys.stdout.flush() +time.sleep(0.5) +sys.stdout.write(%r) +sys.stdout.flush() +""" % binary) + + out = python(py1.name, _out_bufsize=1) + self.assertTrue(out.stdout == b'testingagain\n\xec;\xedr\xdbF\x92\xf9\x8d\xa7\x98\x02/\x15\xd2K\xc3\x94d\xc9') + + + # designed to trigger the "... (%d more, please see e.stdout)" output + # of the ErrorReturnCode class + def test_failure_with_large_output(self): + from sh import ErrorReturnCode_1 + + py = create_tmp_test(""" +print("andrewmoffat" * 1000) +exit(1) +""") + self.assertRaises(ErrorReturnCode_1, python, py.name) + + # designed to check if the ErrorReturnCode constructor does not raise + # an UnicodeDecodeError + def test_non_ascii_error(self): + from sh import ls, ErrorReturnCode + + test = "/á" + if not IS_PY3: test = test.decode("utf8") + + self.assertRaises(ErrorReturnCode, ls, test) + + + def test_no_out(self): + py = create_tmp_test(""" +import sys +sys.stdout.write("stdout") +sys.stderr.write("stderr") +""") + p = python(py.name, _no_out=True) + self.assertEqual(p.stdout, b"") + self.assertEqual(p.stderr, b"stderr") + self.assertTrue(p.process._pipe_queue.empty()) + + def callback(line): pass + p = python(py.name, _out=callback) + self.assertEqual(p.stdout, b"") + self.assertEqual(p.stderr, b"stderr") + self.assertTrue(p.process._pipe_queue.empty()) + + p = python(py.name) + self.assertEqual(p.stdout, b"stdout") + self.assertEqual(p.stderr, b"stderr") + self.assertFalse(p.process._pipe_queue.empty()) + + + def test_no_err(self): + py = create_tmp_test(""" +import sys +sys.stdout.write("stdout") +sys.stderr.write("stderr") +""") + p = python(py.name, _no_err=True) + self.assertEqual(p.stderr, b"") + self.assertEqual(p.stdout, b"stdout") + self.assertFalse(p.process._pipe_queue.empty()) + + def callback(line): pass + p = python(py.name, _err=callback) + self.assertEqual(p.stderr, b"") + self.assertEqual(p.stdout, b"stdout") + self.assertFalse(p.process._pipe_queue.empty()) + + p = python(py.name) + self.assertEqual(p.stderr, b"stderr") + self.assertEqual(p.stdout, b"stdout") + self.assertFalse(p.process._pipe_queue.empty()) + + + def test_no_pipe(self): + from sh import ls + + p = ls() + self.assertFalse(p.process._pipe_queue.empty()) + + def callback(line): pass + p = ls(_out=callback) + self.assertTrue(p.process._pipe_queue.empty()) + + p = ls(_no_pipe=True) + self.assertTrue(p.process._pipe_queue.empty()) + + + def test_decode_error_handling(self): + from functools import partial + + py = create_tmp_test(""" +# -*- coding: utf8 -*- +import sys +sys.stdout.write("te漢字st") +""") + fn = partial(python, py.name, _encoding="ascii") + def s(fn): str(fn()) + self.assertRaises(UnicodeDecodeError, s, fn) + + p = python(py.name, _encoding="ascii", _decode_errors="ignore") + self.assertEqual(p, "test") + + +if __name__ == "__main__": + if len(sys.argv) > 1: + unittest.main() + else: + suite = unittest.TestLoader().loadTestsFromTestCase(Basic) + unittest.TextTestRunner(verbosity=2).run(suite) -- cgit v1.2.3