diff options
Diffstat (limited to 'tests/test_mgminterface.py')
-rw-r--r-- | tests/test_mgminterface.py | 333 |
1 files changed, 333 insertions, 0 deletions
diff --git a/tests/test_mgminterface.py b/tests/test_mgminterface.py new file mode 100644 index 00000000..398b7f36 --- /dev/null +++ b/tests/test_mgminterface.py @@ -0,0 +1,333 @@ +import socket +import select +import telnetlib +import contextlib + +from unittest import TestCase + +import support + +from eip_client.vpnmanager import OpenVPNManager + +HOST = "localhost" + + +class SocketStub(object): + ''' a socket proxy that re-defines sendall() ''' + def __init__(self, reads=[]): + self.reads = reads + self.writes = [] + self.block = False + + def sendall(self, data): + self.writes.append(data) + + def recv(self, size): + out = b'' + while self.reads and len(out) < size: + out += self.reads.pop(0) + #print(out) + if len(out) > size: + self.reads.insert(0, out[size:]) + out = out[:size] + return out + + +class TelnetAlike(telnetlib.Telnet): + def fileno(self): + raise NotImplementedError() + + def close(self): + pass + + def sock_avail(self): + return (not self.sock.block) + + def msg(self, msg, *args): + with support.captured_stdout() as out: + telnetlib.Telnet.msg(self, msg, *args) + self._messages += out.getvalue() + return + + def read_very_lazy(self): + self.fill_rawq() + _all = self.read_all() + print 'faking lazy:', _all + return _all + + +def new_select(*s_args): + block = False + for l in s_args: + for fob in l: + if isinstance(fob, TelnetAlike): + block = fob.sock.block + if block: + return [[], [], []] + else: + return s_args + + +@contextlib.contextmanager +def test_socket(reads): + def new_conn(*ignored): + return SocketStub(reads) + try: + old_conn = socket.create_connection + socket.create_connection = new_conn + yield None + finally: + socket.create_connection = old_conn + return + + +# +# VPN Commands Dict +# + +vpn_commands = { + 'status': [ + 'OpenVPN STATISTICS', 'Updated,Mon Jun 25 11:51:21 2012', + 'TUN/TAP read bytes,306170', 'TUN/TAP write bytes,872102', + 'TCP/UDP read bytes,986177', 'TCP/UDP write bytes,439329', + 'Auth read bytes,872102'], + 'state': ['1340616463,CONNECTED,SUCCESS,172.28.0.2,198.252.153.38'], + # XXX add more tests + } + + +class VPNManagementStub(TelnetAlike): + epilogue = "\nEND\n" + + def write(self, data): + #print('data written') + data = data[:-1] + if data not in vpn_commands: + print('not in commands') + telnetlib.Telnet.write(self, data) + else: + msg = '\n'.join(vpn_commands[data]) + self.epilogue + print 'writing...' + print msg + for line in vpn_commands[data]: + self.sock.reads.append(line) + #telnetlib.Telnet.write(self, line) + self.sock.reads.append(self.epilogue) + #telnetlib.Telnet.write(self, self.epilogue) + + +def test_telnet(reads=[], cls=VPNManagementStub): + ''' return a telnetlib.Telnet object that uses a SocketStub with + reads queued up to be read, and write method mocking a vpn + management interface''' + for x in reads: + assert type(x) is bytes, x + with test_socket(reads): + telnet = cls('dummy', 0) + telnet._messages = '' # debuglevel output + return telnet + + +class ReadTests(TestCase): + def setUp(self): + self.old_select = select.select + select.select = new_select + + def tearDown(self): + select.select = self.old_select + + def test_read_until(self): + """ + read_until(expected, timeout=None) + test the blocking version of read_util + """ + want = [b'xxxmatchyyy'] + telnet = test_telnet(want) + data = telnet.read_until(b'match') + self.assertEqual(data, b'xxxmatch', msg=(telnet.cookedq, + telnet.rawq, telnet.sock.reads)) + + reads = [b'x' * 50, b'match', b'y' * 50] + expect = b''.join(reads[:-1]) + telnet = test_telnet(reads) + data = telnet.read_until(b'match') + self.assertEqual(data, expect) + + def test_read_all(self): + """ + read_all() + Read all data until EOF; may block. + """ + reads = [b'x' * 500, b'y' * 500, b'z' * 500] + expect = b''.join(reads) + telnet = test_telnet(reads) + data = telnet.read_all() + self.assertEqual(data, expect) + return + + def test_read_some(self): + """ + read_some() + Read at least one byte or EOF; may block. + """ + # test 'at least one byte' + telnet = test_telnet([b'x' * 500]) + data = telnet.read_some() + self.assertTrue(len(data) >= 1) + # test EOF + telnet = test_telnet() + data = telnet.read_some() + self.assertEqual(b'', data) + + def _read_eager(self, func_name): + """ + read_*_eager() + Read all data available already queued or on the socket, + without blocking. + """ + want = b'x' * 100 + telnet = test_telnet([want]) + func = getattr(telnet, func_name) + telnet.sock.block = True + self.assertEqual(b'', func()) + telnet.sock.block = False + data = b'' + while True: + try: + data += func() + except EOFError: + break + self.assertEqual(data, want) + + def test_read_eager(self): + # read_eager and read_very_eager make the same gaurantees + # (they behave differently but we only test the gaurantees) + self._read_eager('read_eager') + self._read_eager('read_very_eager') + #self._read_eager('read_very_lazy') + # NB -- we need to test the IAC block which is mentioned in the + # docstring but not in the module docs + + def read_very_lazy(self): + want = b'x' * 100 + telnet = test_telnet([want]) + self.assertEqual(b'', telnet.read_very_lazy()) + while telnet.sock.reads: + telnet.fill_rawq() + data = telnet.read_very_lazy() + self.assertEqual(want, data) + self.assertRaises(EOFError, telnet.read_very_lazy) + + def test_read_lazy(self): + want = b'x' * 100 + telnet = test_telnet([want]) + self.assertEqual(b'', telnet.read_lazy()) + data = b'' + while True: + try: + read_data = telnet.read_lazy() + data += read_data + if not read_data: + telnet.fill_rawq() + except EOFError: + break + self.assertTrue(want.startswith(data)) + self.assertEqual(data, want) + + +def _seek_to_eof(self): + """ + Read as much as available. Position seek pointer to end of stream + """ + #import ipdb;ipdb.set_trace() + while self.tn.sock.reads: + print 'reading...' + print 'and filling rawq' + self.tn.fill_rawq() + self.tn.process_rawq() + try: + b = self.tn.read_eager() + while b: + b = self.tn.read_eager() + except EOFError: + pass + + +def connect_to_stub(self): + """ + stub to be added to manager + """ + try: + self.close() + except: + pass + if self.connected(): + return True + self.tn = test_telnet() + + self._seek_to_eof() + return True + + + +class VPNManagerTests(TestCase): + + def setUp(self): + self.old_select = select.select + select.select = new_select + + patched_manager = OpenVPNManager + patched_manager._seek_to_eof = _seek_to_eof + patched_manager.connect = connect_to_stub + self.manager = patched_manager() + + def tearDown(self): + select.select = self.old_select + + # tests + + + #def test_read_very_lazy(self): + #want = b'x' * 100 + #telnet = test_telnet() + #self.assertEqual(b'', telnet.read_very_lazy()) + #print 'writing to telnet' + #telnet.write('status\n') + #import ipdb;ipdb.set_trace() + #while telnet.sock.reads: + #print 'reading...' + #print 'and filling rawq' + #telnet.fill_rawq() + #import ipdb;ipdb.set_trace() + #data = telnet.read_very_lazy() + #print 'data ->', data + + #def test_manager_status(self): + #buf = self.manager._send_command('state') + #import ipdb;ipdb.set_trace() + #print 'buf-->' + #print buf +# + def test_manager_state(self): + buf = self.manager.state() + print 'buf-->' + print buf + import ipdb;ipdb.set_trace() + + def test_command(self): + commands = [b'status'] + for com in commands: + telnet = test_telnet() + telnet.write(com) + buf = telnet.read_until(b'END') + print 'buf ' + print buf + + +def test_main(verbose=None): + support.run_unittest( + #ReadTests, + VPNManagerTests) + +if __name__ == '__main__': + test_main() |