summaryrefslogtreecommitdiff
path: root/test.py
diff options
context:
space:
mode:
Diffstat (limited to 'test.py')
-rw-r--r--test.py1312
1 files changed, 1312 insertions, 0 deletions
diff --git a/test.py b/test.py
new file mode 100644
index 0000000..efa84d4
--- /dev/null
+++ b/test.py
@@ -0,0 +1,1312 @@
+# -*- coding: utf8 -*-
+
+import os
+import unittest
+import tempfile
+import sys
+import sh
+import platform
+
+IS_OSX = platform.system() == "Darwin"
+IS_PY3 = sys.version_info[0] == 3
+if IS_PY3:
+ unicode = str
+ python = sh.Command(sh.which("python%d.%d" % sys.version_info[:2]))
+else:
+ from sh import python
+
+
+THIS_DIR = os.path.dirname(os.path.abspath(__file__))
+
+skipUnless = getattr(unittest, "skipUnless", None)
+if not skipUnless:
+ def skipUnless(*args, **kwargs):
+ def wrapper(thing): return thing
+ return wrapper
+
+requires_posix = skipUnless(os.name == "posix", "Requires POSIX")
+
+
+
+def create_tmp_test(code):
+ py = tempfile.NamedTemporaryFile()
+ if IS_PY3: code = bytes(code, "UTF-8")
+ py.write(code)
+ py.flush()
+ # we don't explicitly close, because close will remove the file, and we
+ # don't want that until the test case is done. so we let the gc close it
+ # when it goes out of scope
+ return py
+
+
+
+@requires_posix
+class Basic(unittest.TestCase):
+
+ def test_print_command(self):
+ from sh import ls, which
+ actual_location = which("ls")
+ out = str(ls)
+ self.assertEqual(out, actual_location)
+
+ def test_unicode_arg(self):
+ from sh import echo
+
+ test = "漢字"
+ if not IS_PY3: test = test.decode("utf8")
+
+ p = echo(test).strip()
+ self.assertEqual(test, p)
+
+ def test_number_arg(self):
+ py = create_tmp_test("""
+from optparse import OptionParser
+parser = OptionParser()
+options, args = parser.parse_args()
+print(args[0])
+""")
+
+ out = python(py.name, 3).strip()
+ self.assertEqual(out, "3")
+
+ def test_exit_code(self):
+ from sh import ls, ErrorReturnCode
+ self.assertEqual(ls("/").exit_code, 0)
+ self.assertRaises(ErrorReturnCode, ls, "/aofwje/garogjao4a/eoan3on")
+
+ def test_glob_warning(self):
+ from sh import ls
+ from glob import glob
+ import warnings
+
+ with warnings.catch_warnings(record=True) as w:
+ warnings.simplefilter("always")
+
+ ls(glob("ofjaoweijfaowe"))
+
+ self.assertTrue(len(w) == 1)
+ self.assertTrue(issubclass(w[-1].category, UserWarning))
+ self.assertTrue("glob" in str(w[-1].message))
+
+ def test_stdin_from_string(self):
+ from sh import sed
+ self.assertEqual(sed(_in="test", e="s/test/lol/").strip(), "lol")
+
+ def test_ok_code(self):
+ from sh import ls, ErrorReturnCode_1, ErrorReturnCode_2
+
+ exc_to_test = ErrorReturnCode_2
+ code_to_pass = 2
+ if IS_OSX:
+ exc_to_test = ErrorReturnCode_1
+ code_to_pass = 1
+ self.assertRaises(exc_to_test, ls, "/aofwje/garogjao4a/eoan3on")
+
+ ls("/aofwje/garogjao4a/eoan3on", _ok_code=code_to_pass)
+ ls("/aofwje/garogjao4a/eoan3on", _ok_code=[code_to_pass])
+
+ def test_quote_escaping(self):
+ py = create_tmp_test("""
+from optparse import OptionParser
+parser = OptionParser()
+options, args = parser.parse_args()
+print(args)
+""")
+ out = python(py.name, "one two three").strip()
+ self.assertEqual(out, "['one two three']")
+
+ out = python(py.name, "one \"two three").strip()
+ self.assertEqual(out, "['one \"two three']")
+
+ out = python(py.name, "one", "two three").strip()
+ self.assertEqual(out, "['one', 'two three']")
+
+ out = python(py.name, "one", "two \"haha\" three").strip()
+ self.assertEqual(out, "['one', 'two \"haha\" three']")
+
+ out = python(py.name, "one two's three").strip()
+ self.assertEqual(out, "[\"one two's three\"]")
+
+ out = python(py.name, 'one two\'s three').strip()
+ self.assertEqual(out, "[\"one two's three\"]")
+
+ def test_multiple_pipes(self):
+ from sh import tr, python
+ import time
+
+ py = create_tmp_test("""
+import sys
+import os
+import time
+
+for l in "andrew":
+ print(l)
+ time.sleep(.2)
+""")
+
+ class Derp(object):
+ def __init__(self):
+ self.times = []
+ self.stdout = []
+ self.last_received = None
+
+ def agg(self, line):
+ self.stdout.append(line.strip())
+ now = time.time()
+ if self.last_received: self.times.append(now - self.last_received)
+ self.last_received = now
+
+ derp = Derp()
+
+ p = tr(
+ tr(
+ tr(
+ python(py.name, _piped=True),
+ "aw", "wa", _piped=True),
+ "ne", "en", _piped=True),
+ "dr", "rd", _out=derp.agg)
+
+ p.wait()
+ self.assertEqual("".join(derp.stdout), "werdna")
+ self.assertTrue(all([t > .15 for t in derp.times]))
+
+
+ def test_manual_stdin_string(self):
+ from sh import tr
+
+ out = tr("[:lower:]", "[:upper:]", _in="andrew").strip()
+ self.assertEqual(out, "ANDREW")
+
+
+ def test_manual_stdin_iterable(self):
+ from sh import tr
+
+ test = ["testing\n", "herp\n", "derp\n"]
+ out = tr("[:lower:]", "[:upper:]", _in=test)
+
+ match = "".join([t.upper() for t in test])
+ self.assertEqual(out, match)
+
+
+ def test_manual_stdin_file(self):
+ from sh import tr
+ import tempfile
+
+ test_string = "testing\nherp\nderp\n"
+
+ stdin = tempfile.NamedTemporaryFile()
+ stdin.write(test_string.encode())
+ stdin.flush()
+ stdin.seek(0)
+
+ out = tr("[:lower:]", "[:upper:]", _in=stdin)
+
+ self.assertEqual(out, test_string.upper())
+
+
+ def test_manual_stdin_queue(self):
+ from sh import tr
+ try: from Queue import Queue, Empty
+ except ImportError: from queue import Queue, Empty
+
+ test = ["testing\n", "herp\n", "derp\n"]
+
+ q = Queue()
+ for t in test: q.put(t)
+ q.put(None) # EOF
+
+ out = tr("[:lower:]", "[:upper:]", _in=q)
+
+ match = "".join([t.upper() for t in test])
+ self.assertEqual(out, match)
+
+
+ def test_environment(self):
+ import os
+
+ env = {"HERP": "DERP"}
+
+ py = create_tmp_test("""
+import os
+
+osx_cruft = ["__CF_USER_TEXT_ENCODING", "__PYVENV_LAUNCHER__"]
+for key in osx_cruft:
+ try: del os.environ[key]
+ except: pass
+print(os.environ["HERP"] + " " + str(len(os.environ)))
+""")
+ out = python(py.name, _env=env).strip()
+ self.assertEqual(out, "DERP 1")
+
+ py = create_tmp_test("""
+import os, sys
+sys.path.insert(0, os.getcwd())
+import sh
+osx_cruft = ["__CF_USER_TEXT_ENCODING", "__PYVENV_LAUNCHER__"]
+for key in osx_cruft:
+ try: del os.environ[key]
+ except: pass
+print(sh.HERP + " " + str(len(os.environ)))
+""")
+ out = python(py.name, _env=env, _cwd=THIS_DIR).strip()
+ self.assertEqual(out, "DERP 1")
+
+
+ def test_which(self):
+ from sh import which, ls
+ self.assertEqual(which("fjoawjefojawe"), None)
+ self.assertEqual(which("ls"), str(ls))
+
+
+ def test_foreground(self):
+ return
+ raise NotImplementedError
+
+ def test_no_arg(self):
+ import pwd
+ from sh import whoami
+ u1 = whoami().strip()
+ u2 = pwd.getpwuid(os.geteuid())[0]
+ self.assertEqual(u1, u2)
+
+ def test_incompatible_special_args(self):
+ from sh import ls
+ self.assertRaises(TypeError, ls, _iter=True, _piped=True)
+
+
+ def test_exception(self):
+ from sh import ls, ErrorReturnCode_1, ErrorReturnCode_2
+
+ exc_to_test = ErrorReturnCode_2
+ if IS_OSX: exc_to_test = ErrorReturnCode_1
+ self.assertRaises(exc_to_test, ls, "/aofwje/garogjao4a/eoan3on")
+
+
+ def test_command_not_found(self):
+ from sh import CommandNotFound
+
+ def do_import(): from sh import aowjgoawjoeijaowjellll
+ self.assertRaises(ImportError, do_import)
+
+ def do_import():
+ import sh
+ sh.awoefaowejfw
+ self.assertRaises(CommandNotFound, do_import)
+
+
+ def test_command_wrapper_equivalence(self):
+ from sh import Command, ls, which
+
+ self.assertEqual(Command(which("ls")), ls)
+
+
+ def test_multiple_args_short_option(self):
+ py = create_tmp_test("""
+from optparse import OptionParser
+parser = OptionParser()
+parser.add_option("-l", dest="long_option")
+options, args = parser.parse_args()
+print(len(options.long_option.split()))
+""")
+ num_args = int(python(py.name, l="one two three"))
+ self.assertEqual(num_args, 3)
+
+ num_args = int(python(py.name, "-l", "one's two's three's"))
+ self.assertEqual(num_args, 3)
+
+
+ def test_multiple_args_long_option(self):
+ py = create_tmp_test("""
+from optparse import OptionParser
+parser = OptionParser()
+parser.add_option("-l", "--long-option", dest="long_option")
+options, args = parser.parse_args()
+print(len(options.long_option.split()))
+""")
+ num_args = int(python(py.name, long_option="one two three"))
+ self.assertEqual(num_args, 3)
+
+ num_args = int(python(py.name, "--long-option", "one's two's three's"))
+ self.assertEqual(num_args, 3)
+
+
+ def test_short_bool_option(self):
+ py = create_tmp_test("""
+from optparse import OptionParser
+parser = OptionParser()
+parser.add_option("-s", action="store_true", default=False, dest="short_option")
+options, args = parser.parse_args()
+print(options.short_option)
+""")
+ self.assertTrue(python(py.name, s=True).strip() == "True")
+ self.assertTrue(python(py.name, s=False).strip() == "False")
+ self.assertTrue(python(py.name).strip() == "False")
+
+
+ def test_long_bool_option(self):
+ py = create_tmp_test("""
+from optparse import OptionParser
+parser = OptionParser()
+parser.add_option("-l", "--long-option", action="store_true", default=False, dest="long_option")
+options, args = parser.parse_args()
+print(options.long_option)
+""")
+ self.assertTrue(python(py.name, long_option=True).strip() == "True")
+ self.assertTrue(python(py.name).strip() == "False")
+
+
+ def test_composition(self):
+ from sh import ls, wc
+ c1 = int(wc(ls("-A1"), l=True))
+ c2 = len(os.listdir("."))
+ self.assertEqual(c1, c2)
+
+ def test_incremental_composition(self):
+ from sh import ls, wc
+ c1 = int(wc(ls("-A1", _piped=True), l=True).strip())
+ c2 = len(os.listdir("."))
+ if c1 != c2:
+ with open("/tmp/fail", "a") as h: h.write("FUCK\n")
+ self.assertEqual(c1, c2)
+
+
+ def test_short_option(self):
+ from sh import sh
+ s1 = sh(c="echo test").strip()
+ s2 = "test"
+ self.assertEqual(s1, s2)
+
+
+ def test_long_option(self):
+ py = create_tmp_test("""
+from optparse import OptionParser
+parser = OptionParser()
+parser.add_option("-l", "--long-option", action="store", default="", dest="long_option")
+options, args = parser.parse_args()
+print(options.long_option.upper())
+""")
+ self.assertTrue(python(py.name, long_option="testing").strip() == "TESTING")
+ self.assertTrue(python(py.name).strip() == "")
+
+
+ def test_command_wrapper(self):
+ from sh import Command, which
+
+ ls = Command(which("ls"))
+ wc = Command(which("wc"))
+
+ c1 = int(wc(ls("-A1"), l=True))
+ c2 = len(os.listdir("."))
+
+ self.assertEqual(c1, c2)
+
+
+ def test_background(self):
+ from sh import sleep
+ import time
+
+ start = time.time()
+ sleep_time = .5
+ p = sleep(sleep_time, _bg=True)
+
+ now = time.time()
+ self.assertTrue(now - start < sleep_time)
+
+ p.wait()
+ now = time.time()
+ self.assertTrue(now - start > sleep_time)
+
+
+ def test_background_exception(self):
+ from sh import ls, ErrorReturnCode_1, ErrorReturnCode_2
+ p = ls("/ofawjeofj", _bg=True) # should not raise
+
+ exc_to_test = ErrorReturnCode_2
+ if IS_OSX: exc_to_test = ErrorReturnCode_1
+ self.assertRaises(exc_to_test, p.wait) # should raise
+
+
+ def test_with_context(self):
+ from sh import whoami
+ import getpass
+
+ py = create_tmp_test("""
+import sys
+import os
+import subprocess
+
+print("with_context")
+subprocess.Popen(sys.argv[1:], shell=False).wait()
+""")
+
+ cmd1 = python.bake(py.name, _with=True)
+ with cmd1:
+ out = whoami()
+ self.assertTrue("with_context" in out)
+ self.assertTrue(getpass.getuser() in out)
+
+
+ def test_with_context_args(self):
+ from sh import whoami
+ import getpass
+
+ py = create_tmp_test("""
+import sys
+import os
+import subprocess
+from optparse import OptionParser
+
+parser = OptionParser()
+parser.add_option("-o", "--opt", action="store_true", default=False, dest="opt")
+options, args = parser.parse_args()
+
+if options.opt:
+ subprocess.Popen(args[0], shell=False).wait()
+""")
+ with python(py.name, opt=True, _with=True):
+ out = whoami()
+ self.assertTrue(getpass.getuser() == out.strip())
+
+
+ with python(py.name, _with=True):
+ out = whoami()
+ self.assertTrue(out == "")
+
+
+
+ def test_err_to_out(self):
+ py = create_tmp_test("""
+import sys
+import os
+
+sys.stdout.write("stdout")
+sys.stdout.flush()
+sys.stderr.write("stderr")
+sys.stderr.flush()
+""")
+ stdout = python(py.name, _err_to_out=True)
+ self.assertTrue(stdout == "stdoutstderr")
+
+
+
+ def test_out_redirection(self):
+ import tempfile
+
+ py = create_tmp_test("""
+import sys
+import os
+
+sys.stdout.write("stdout")
+sys.stderr.write("stderr")
+""")
+
+ file_obj = tempfile.TemporaryFile()
+ out = python(py.name, _out=file_obj)
+
+ self.assertTrue(len(out) == 0)
+
+ file_obj.seek(0)
+ actual_out = file_obj.read()
+ file_obj.close()
+
+ self.assertTrue(len(actual_out) != 0)
+
+
+ # test with tee
+ file_obj = tempfile.TemporaryFile()
+ out = python(py.name, _out=file_obj, _tee=True)
+
+ self.assertTrue(len(out) != 0)
+
+ file_obj.seek(0)
+ actual_out = file_obj.read()
+ file_obj.close()
+
+ self.assertTrue(len(actual_out) != 0)
+
+
+
+ def test_err_redirection(self):
+ import tempfile
+
+ py = create_tmp_test("""
+import sys
+import os
+
+sys.stdout.write("stdout")
+sys.stderr.write("stderr")
+""")
+ file_obj = tempfile.TemporaryFile()
+ p = python(py.name, _err=file_obj)
+
+ file_obj.seek(0)
+ stderr = file_obj.read().decode()
+ file_obj.close()
+
+ self.assertTrue(p.stdout == b"stdout")
+ self.assertTrue(stderr == "stderr")
+ self.assertTrue(len(p.stderr) == 0)
+
+ # now with tee
+ file_obj = tempfile.TemporaryFile()
+ p = python(py.name, _err=file_obj, _tee="err")
+
+ file_obj.seek(0)
+ stderr = file_obj.read().decode()
+ file_obj.close()
+
+ self.assertTrue(p.stdout == b"stdout")
+ self.assertTrue(stderr == "stderr")
+ self.assertTrue(len(p.stderr) != 0)
+
+
+
+ def test_err_redirection_actual_file(self):
+ import tempfile
+ file_obj = tempfile.NamedTemporaryFile()
+ py = create_tmp_test("""
+import sys
+import os
+
+sys.stdout.write("stdout")
+sys.stderr.write("stderr")
+""")
+ stdout = python(py.name, _err=file_obj.name, u=True).wait()
+ file_obj.seek(0)
+ stderr = file_obj.read().decode()
+ file_obj.close()
+ self.assertTrue(stdout == "stdout")
+ self.assertTrue(stderr == "stderr")
+
+ def test_subcommand_and_bake(self):
+ from sh import ls
+ import getpass
+
+ py = create_tmp_test("""
+import sys
+import os
+import subprocess
+
+print("subcommand")
+subprocess.Popen(sys.argv[1:], shell=False).wait()
+""")
+
+ cmd1 = python.bake(py.name)
+ out = cmd1.whoami()
+ self.assertTrue("subcommand" in out)
+ self.assertTrue(getpass.getuser() in out)
+
+
+ def test_multiple_bakes(self):
+ from sh import whoami
+ import getpass
+
+ py = create_tmp_test("""
+import sys
+import subprocess
+subprocess.Popen(sys.argv[1:], shell=False).wait()
+""")
+
+ out = python.bake(py.name).bake("whoami")()
+ self.assertTrue(getpass.getuser() == out.strip())
+
+
+
+ def test_bake_args_come_first(self):
+ from sh import ls
+ ls = ls.bake(h=True)
+
+ ran = ls("-la").ran
+ ft = ran.index("-h")
+ self.assertTrue("-la" in ran[ft:])
+
+ def test_output_equivalence(self):
+ from sh import whoami
+
+ iam1 = whoami()
+ iam2 = whoami()
+
+ self.assertEqual(iam1, iam2)
+
+
+ def test_stdout_callback(self):
+ py = create_tmp_test("""
+import sys
+import os
+
+for i in range(5): print(i)
+""")
+ stdout = []
+ def agg(line):
+ stdout.append(line)
+
+ p = python(py.name, _out=agg, u=True)
+ p.wait()
+
+ self.assertTrue(len(stdout) == 5)
+
+
+
+ def test_stdout_callback_no_wait(self):
+ import time
+
+ py = create_tmp_test("""
+import sys
+import os
+import time
+
+for i in range(5):
+ print(i)
+ time.sleep(.5)
+""")
+
+ stdout = []
+ def agg(line): stdout.append(line)
+
+ p = python(py.name, _out=agg, u=True)
+
+ # we give a little pause to make sure that the NamedTemporaryFile
+ # exists when the python process actually starts
+ time.sleep(.5)
+
+ self.assertTrue(len(stdout) != 5)
+
+
+
+ def test_stdout_callback_line_buffered(self):
+ py = create_tmp_test("""
+import sys
+import os
+
+for i in range(5): print("herpderp")
+""")
+
+ stdout = []
+ def agg(line): stdout.append(line)
+
+ p = python(py.name, _out=agg, _out_bufsize=1, u=True)
+ p.wait()
+
+ self.assertTrue(len(stdout) == 5)
+
+
+
+ def test_stdout_callback_line_unbuffered(self):
+ py = create_tmp_test("""
+import sys
+import os
+
+for i in range(5): print("herpderp")
+""")
+
+ stdout = []
+ def agg(char): stdout.append(char)
+
+ p = python(py.name, _out=agg, _out_bufsize=0, u=True)
+ p.wait()
+
+ # + 5 newlines
+ self.assertTrue(len(stdout) == (len("herpderp") * 5 + 5))
+
+
+ def test_stdout_callback_buffered(self):
+ py = create_tmp_test("""
+import sys
+import os
+
+for i in range(5): sys.stdout.write("herpderp")
+""")
+
+ stdout = []
+ def agg(chunk): stdout.append(chunk)
+
+ p = python(py.name, _out=agg, _out_bufsize=4, u=True)
+ p.wait()
+
+ self.assertTrue(len(stdout) == (len("herp")/2 * 5))
+
+
+
+ def test_stdout_callback_with_input(self):
+ py = create_tmp_test("""
+import sys
+import os
+IS_PY3 = sys.version_info[0] == 3
+if IS_PY3: raw_input = input
+
+for i in range(5): print(str(i))
+derp = raw_input("herp? ")
+print(derp)
+""")
+
+ def agg(line, stdin):
+ if line.strip() == "4": stdin.put("derp\n")
+
+ p = python(py.name, _out=agg, u=True, _tee=True)
+ p.wait()
+
+ self.assertTrue("derp" in p)
+
+
+
+ def test_stdout_callback_exit(self):
+ py = create_tmp_test("""
+import sys
+import os
+
+for i in range(5): print(i)
+""")
+
+ stdout = []
+ def agg(line):
+ line = line.strip()
+ stdout.append(line)
+ if line == "2": return True
+
+ p = python(py.name, _out=agg, u=True, _tee=True)
+ p.wait()
+
+ self.assertTrue("4" in p)
+ self.assertTrue("4" not in stdout)
+
+
+
+ def test_stdout_callback_terminate(self):
+ import signal
+ py = create_tmp_test("""
+import sys
+import os
+import time
+
+for i in range(5):
+ print(i)
+ time.sleep(.5)
+""")
+
+ stdout = []
+ def agg(line, stdin, process):
+ line = line.strip()
+ stdout.append(line)
+ if line == "3":
+ process.terminate()
+ return True
+
+ p = python(py.name, _out=agg, u=True)
+ p.wait()
+
+ self.assertEqual(p.process.exit_code, -signal.SIGTERM)
+ self.assertTrue("4" not in p)
+ self.assertTrue("4" not in stdout)
+
+
+
+ def test_stdout_callback_kill(self):
+ import signal
+
+ py = create_tmp_test("""
+import sys
+import os
+import time
+
+for i in range(5):
+ print(i)
+ time.sleep(.5)
+""")
+
+ stdout = []
+ def agg(line, stdin, process):
+ line = line.strip()
+ stdout.append(line)
+ if line == "3":
+ process.kill()
+ return True
+
+ p = python(py.name, _out=agg, u=True)
+ p.wait()
+
+ self.assertEqual(p.process.exit_code, -signal.SIGKILL)
+ self.assertTrue("4" not in p)
+ self.assertTrue("4" not in stdout)
+
+ def test_general_signal(self):
+ import signal
+ from signal import SIGINT
+
+ py = create_tmp_test("""
+import sys
+import os
+import time
+import signal
+
+def sig_handler(sig, frame):
+ print(10)
+ exit(0)
+
+signal.signal(signal.SIGINT, sig_handler)
+
+for i in range(5):
+ print(i)
+ sys.stdout.flush()
+ time.sleep(0.5)
+""")
+
+ stdout = []
+ def agg(line, stdin, process):
+ line = line.strip()
+ stdout.append(line)
+ if line == "3":
+ process.signal(SIGINT)
+ return True
+
+ p = python(py.name, _out=agg, _tee=True)
+ p.wait()
+
+ self.assertEqual(p.process.exit_code, 0)
+ self.assertEqual(p, "0\n1\n2\n3\n10\n")
+
+
+ def test_iter_generator(self):
+ py = create_tmp_test("""
+import sys
+import os
+import time
+
+for i in range(42):
+ print(i)
+ sys.stdout.flush()
+""")
+
+ out = []
+ for line in python(py.name, _iter=True):
+ out.append(int(line.strip()))
+ self.assertTrue(len(out) == 42 and sum(out) == 861)
+
+
+ def test_nonblocking_iter(self):
+ import tempfile
+ from sh import tail
+ from errno import EWOULDBLOCK
+
+ tmp = tempfile.NamedTemporaryFile()
+ for line in tail("-f", tmp.name, _iter_noblock=True): break
+ self.assertEqual(line, EWOULDBLOCK)
+
+
+ def test_for_generator_to_err(self):
+ py = create_tmp_test("""
+import sys
+import os
+
+for i in range(42):
+ sys.stderr.write(str(i)+"\\n")
+""")
+
+ out = []
+ for line in python(py.name, _iter="err", u=True): out.append(line)
+ self.assertTrue(len(out) == 42)
+
+ # verify that nothing is going to stdout
+ out = []
+ for line in python(py.name, _iter="out", u=True): out.append(line)
+ self.assertTrue(len(out) == 0)
+
+
+
+ def test_piped_generator(self):
+ from sh import tr
+ from string import ascii_uppercase
+ import time
+
+ py1 = create_tmp_test("""
+import sys
+import os
+import time
+
+for letter in "andrew":
+ time.sleep(0.5)
+ print(letter)
+ """)
+
+ py2 = create_tmp_test("""
+import sys
+import os
+import time
+
+while True:
+ line = sys.stdin.readline()
+ if not line: break
+ print(line.strip().upper())
+ """)
+
+
+ times = []
+ last_received = None
+
+ letters = ""
+ for line in python(python(py1.name, _piped="out", u=True), py2.name, _iter=True, u=True):
+ if not letters: start = time.time()
+ letters += line.strip()
+
+ now = time.time()
+ if last_received: times.append(now - last_received)
+ last_received = now
+
+ self.assertEqual("ANDREW", letters)
+ self.assertTrue(all([t > .3 for t in times]))
+
+
+ def test_generator_and_callback(self):
+ py = create_tmp_test("""
+import sys
+import os
+
+for i in range(42):
+ sys.stderr.write(str(i * 2)+"\\n")
+ print(i)
+""")
+
+ stderr = []
+ def agg(line): stderr.append(int(line.strip()))
+
+ out = []
+ for line in python(py.name, _iter=True, _err=agg, u=True): out.append(line)
+
+ self.assertTrue(len(out) == 42)
+ self.assertTrue(sum(stderr) == 1722)
+
+
+ def test_bg_to_int(self):
+ from sh import echo
+ # bugs with background might cause the following error:
+ # ValueError: invalid literal for int() with base 10: ''
+ self.assertEqual(int(echo("123", _bg=True)), 123)
+
+
+ def test_cwd(self):
+ from sh import pwd
+ from os.path import realpath
+ self.assertEqual(str(pwd(_cwd="/tmp")), realpath("/tmp")+"\n")
+ self.assertEqual(str(pwd(_cwd="/etc")), realpath("/etc")+"\n")
+
+
+ def test_huge_piped_data(self):
+ from sh import tr
+
+ stdin = tempfile.NamedTemporaryFile()
+
+ data = "herpderp" * 4000 + "\n"
+ stdin.write(data.encode())
+ stdin.flush()
+ stdin.seek(0)
+
+ out = tr(tr("[:lower:]", "[:upper:]", _in=data), "[:upper:]", "[:lower:]")
+ self.assertTrue(out == data)
+
+
+ def test_tty_input(self):
+ py = create_tmp_test("""
+import sys
+import os
+
+if os.isatty(sys.stdin.fileno()):
+ sys.stdout.write("password?\\n")
+ sys.stdout.flush()
+ pw = sys.stdin.readline().strip()
+ sys.stdout.write("%s\\n" % ("*" * len(pw)))
+ sys.stdout.flush()
+else:
+ sys.stdout.write("no tty attached!\\n")
+ sys.stdout.flush()
+""")
+
+ test_pw = "test123"
+ expected_stars = "*" * len(test_pw)
+ d = {}
+
+ def password_enterer(line, stdin):
+ line = line.strip()
+ if not line: return
+
+ if line == "password?":
+ stdin.put(test_pw+"\n")
+
+ elif line.startswith("*"):
+ d["stars"] = line
+ return True
+
+ pw_stars = python(py.name, _tty_in=True, _out=password_enterer)
+ pw_stars.wait()
+ self.assertEqual(d["stars"], expected_stars)
+
+ response = python(py.name)
+ self.assertEqual(response, "no tty attached!\n")
+
+
+ def test_stringio_output(self):
+ from sh import echo
+ if IS_PY3:
+ from io import StringIO
+ from io import BytesIO as cStringIO
+ else:
+ from StringIO import StringIO
+ from cStringIO import StringIO as cStringIO
+
+ out = StringIO()
+ echo("-n", "testing 123", _out=out)
+ self.assertEqual(out.getvalue(), "testing 123")
+
+ out = cStringIO()
+ echo("-n", "testing 123", _out=out)
+ self.assertEqual(out.getvalue().decode(), "testing 123")
+
+
+ def test_stringio_input(self):
+ from sh import cat
+
+ if IS_PY3:
+ from io import StringIO
+ from io import BytesIO as cStringIO
+ else:
+ from StringIO import StringIO
+ from cStringIO import StringIO as cStringIO
+
+ input = StringIO()
+ input.write("herpderp")
+ input.seek(0)
+
+ out = cat(_in=input)
+ self.assertEqual(out, "herpderp")
+
+
+ def test_internal_bufsize(self):
+ from sh import cat
+
+ output = cat(_in="a"*1000, _internal_bufsize=100, _out_bufsize=0)
+ self.assertEqual(len(output), 100)
+
+ output = cat(_in="a"*1000, _internal_bufsize=50, _out_bufsize=2)
+ self.assertEqual(len(output), 100)
+
+
+ def test_change_stdout_buffering(self):
+ py = create_tmp_test("""
+import sys
+import os
+
+# this proves that we won't get the output into our callback until we send
+# a newline
+sys.stdout.write("switch ")
+sys.stdout.flush()
+sys.stdout.write("buffering\\n")
+sys.stdout.flush()
+
+sys.stdin.read(1)
+sys.stdout.write("unbuffered")
+sys.stdout.flush()
+
+# this is to keep the output from being flushed by the process ending, which
+# would ruin our test. we want to make sure we get the string "unbuffered"
+# before the process ends, without writing a newline
+sys.stdin.read(1)
+""")
+
+ d = {"success": False}
+ def interact(line, stdin, process):
+ line = line.strip()
+ if not line: return
+
+ if line == "switch buffering":
+ process.out_bufsize(0)
+ stdin.put("a")
+
+ elif line == "unbuffered":
+ stdin.put("b")
+ d["success"] = True
+ return True
+
+ # start with line buffered stdout
+ pw_stars = python(py.name, _out=interact, _out_bufsize=1, u=True)
+ pw_stars.wait()
+
+ self.assertTrue(d["success"])
+
+
+
+ def test_encoding(self):
+ return
+ raise NotImplementedError("what's the best way to test a different \
+'_encoding' special keyword argument?")
+
+
+ def test_timeout(self):
+ from sh import sleep
+ from time import time
+
+ # check that a normal sleep is more or less how long the whole process
+ # takes
+ sleep_for = 3
+ started = time()
+ sh.sleep(sleep_for).wait()
+ elapsed = time() - started
+
+ self.assertTrue(abs(elapsed - sleep_for) < 0.1)
+
+ # now make sure that killing early makes the process take less time
+ sleep_for = 3
+ timeout = 1
+ started = time()
+ sh.sleep(sleep_for, _timeout=timeout).wait()
+ elapsed = time() - started
+ self.assertTrue(abs(elapsed - timeout) < 0.1)
+
+
+ def test_binary_pipe(self):
+ binary = b'\xec;\xedr\xdbF\x92\xf9\x8d\xa7\x98\x02/\x15\xd2K\xc3\x94d\xc9'
+
+ py1 = create_tmp_test("""
+import sys
+import os
+
+sys.stdout = os.fdopen(sys.stdout.fileno(), "wb", 0)
+sys.stdout.write(%r)
+""" % binary)
+
+ py2 = create_tmp_test("""
+import sys
+import os
+
+sys.stdin = os.fdopen(sys.stdin.fileno(), "rb", 0)
+sys.stdout = os.fdopen(sys.stdout.fileno(), "wb", 0)
+sys.stdout.write(sys.stdin.read())
+""")
+ out = python(python(py1.name), py2.name)
+ self.assertEqual(out.stdout, binary)
+
+
+ def test_auto_change_buffering(self):
+ binary = b'\xec;\xedr\xdbF\x92\xf9\x8d\xa7\x98\x02/\x15\xd2K\xc3\x94d\xc9'
+ py1 = create_tmp_test("""
+import sys
+import os
+import time
+
+sys.stdout = os.fdopen(sys.stdout.fileno(), "wb", 0)
+sys.stdout.write(b"testing")
+sys.stdout.flush()
+# to ensure that sh's select loop picks up the write before we write again
+time.sleep(0.5)
+sys.stdout.write(b"again\\n")
+sys.stdout.flush()
+time.sleep(0.5)
+sys.stdout.write(%r)
+sys.stdout.flush()
+""" % binary)
+
+ out = python(py1.name, _out_bufsize=1)
+ self.assertTrue(out.stdout == b'testingagain\n\xec;\xedr\xdbF\x92\xf9\x8d\xa7\x98\x02/\x15\xd2K\xc3\x94d\xc9')
+
+
+ # designed to trigger the "... (%d more, please see e.stdout)" output
+ # of the ErrorReturnCode class
+ def test_failure_with_large_output(self):
+ from sh import ErrorReturnCode_1
+
+ py = create_tmp_test("""
+print("andrewmoffat" * 1000)
+exit(1)
+""")
+ self.assertRaises(ErrorReturnCode_1, python, py.name)
+
+ # designed to check if the ErrorReturnCode constructor does not raise
+ # an UnicodeDecodeError
+ def test_non_ascii_error(self):
+ from sh import ls, ErrorReturnCode
+
+ test = "/á"
+ if not IS_PY3: test = test.decode("utf8")
+
+ self.assertRaises(ErrorReturnCode, ls, test)
+
+
+ def test_no_out(self):
+ py = create_tmp_test("""
+import sys
+sys.stdout.write("stdout")
+sys.stderr.write("stderr")
+""")
+ p = python(py.name, _no_out=True)
+ self.assertEqual(p.stdout, b"")
+ self.assertEqual(p.stderr, b"stderr")
+ self.assertTrue(p.process._pipe_queue.empty())
+
+ def callback(line): pass
+ p = python(py.name, _out=callback)
+ self.assertEqual(p.stdout, b"")
+ self.assertEqual(p.stderr, b"stderr")
+ self.assertTrue(p.process._pipe_queue.empty())
+
+ p = python(py.name)
+ self.assertEqual(p.stdout, b"stdout")
+ self.assertEqual(p.stderr, b"stderr")
+ self.assertFalse(p.process._pipe_queue.empty())
+
+
+ def test_no_err(self):
+ py = create_tmp_test("""
+import sys
+sys.stdout.write("stdout")
+sys.stderr.write("stderr")
+""")
+ p = python(py.name, _no_err=True)
+ self.assertEqual(p.stderr, b"")
+ self.assertEqual(p.stdout, b"stdout")
+ self.assertFalse(p.process._pipe_queue.empty())
+
+ def callback(line): pass
+ p = python(py.name, _err=callback)
+ self.assertEqual(p.stderr, b"")
+ self.assertEqual(p.stdout, b"stdout")
+ self.assertFalse(p.process._pipe_queue.empty())
+
+ p = python(py.name)
+ self.assertEqual(p.stderr, b"stderr")
+ self.assertEqual(p.stdout, b"stdout")
+ self.assertFalse(p.process._pipe_queue.empty())
+
+
+ def test_no_pipe(self):
+ from sh import ls
+
+ p = ls()
+ self.assertFalse(p.process._pipe_queue.empty())
+
+ def callback(line): pass
+ p = ls(_out=callback)
+ self.assertTrue(p.process._pipe_queue.empty())
+
+ p = ls(_no_pipe=True)
+ self.assertTrue(p.process._pipe_queue.empty())
+
+
+ def test_decode_error_handling(self):
+ from functools import partial
+
+ py = create_tmp_test("""
+# -*- coding: utf8 -*-
+import sys
+sys.stdout.write("te漢字st")
+""")
+ fn = partial(python, py.name, _encoding="ascii")
+ def s(fn): str(fn())
+ self.assertRaises(UnicodeDecodeError, s, fn)
+
+ p = python(py.name, _encoding="ascii", _decode_errors="ignore")
+ self.assertEqual(p, "test")
+
+
+if __name__ == "__main__":
+ if len(sys.argv) > 1:
+ unittest.main()
+ else:
+ suite = unittest.TestLoader().loadTestsFromTestCase(Basic)
+ unittest.TextTestRunner(verbosity=2).run(suite)