diff options
Diffstat (limited to 'tests/test_mgminterface.py')
| -rw-r--r-- | tests/test_mgminterface.py | 333 | 
1 files changed, 0 insertions, 333 deletions
| diff --git a/tests/test_mgminterface.py b/tests/test_mgminterface.py deleted file mode 100644 index 398b7f36..00000000 --- a/tests/test_mgminterface.py +++ /dev/null @@ -1,333 +0,0 @@ -import socket -import select -import telnetlib -import contextlib - -from unittest import TestCase - -import support - -from eip_client.vpnmanager import OpenVPNManager - -HOST = "localhost" - - -class SocketStub(object): -    ''' a socket proxy that re-defines sendall() ''' -    def __init__(self, reads=[]): -        self.reads = reads -        self.writes = [] -        self.block = False - -    def sendall(self, data): -        self.writes.append(data) - -    def recv(self, size): -        out = b'' -        while self.reads and len(out) < size: -            out += self.reads.pop(0) -            #print(out) -        if len(out) > size: -            self.reads.insert(0, out[size:]) -            out = out[:size] -        return out - - -class TelnetAlike(telnetlib.Telnet): -    def fileno(self): -        raise NotImplementedError() - -    def close(self): -        pass - -    def sock_avail(self): -        return (not self.sock.block) - -    def msg(self, msg, *args): -        with support.captured_stdout() as out: -            telnetlib.Telnet.msg(self, msg, *args) -        self._messages += out.getvalue() -        return - -    def read_very_lazy(self): -        self.fill_rawq() -        _all = self.read_all() -        print 'faking lazy:', _all -        return _all - - -def new_select(*s_args): -    block = False -    for l in s_args: -        for fob in l: -            if isinstance(fob, TelnetAlike): -                block = fob.sock.block -    if block: -        return [[], [], []] -    else: -        return s_args - - -@contextlib.contextmanager -def test_socket(reads): -    def new_conn(*ignored): -        return SocketStub(reads) -    try: -        old_conn = socket.create_connection -        socket.create_connection = new_conn -        yield None -    finally: -        socket.create_connection = old_conn -    return - - -# -# VPN Commands Dict -# - -vpn_commands = { -        'status': [ -            'OpenVPN STATISTICS', 'Updated,Mon Jun 25 11:51:21 2012', -            'TUN/TAP read bytes,306170', 'TUN/TAP write bytes,872102', -            'TCP/UDP read bytes,986177', 'TCP/UDP write bytes,439329', -            'Auth read bytes,872102'], -        'state': ['1340616463,CONNECTED,SUCCESS,172.28.0.2,198.252.153.38'], -        # XXX add more tests -        } - - -class VPNManagementStub(TelnetAlike): -    epilogue = "\nEND\n" - -    def write(self, data): -        #print('data written') -        data = data[:-1] -        if data not in vpn_commands: -            print('not in commands') -            telnetlib.Telnet.write(self, data) -        else: -            msg = '\n'.join(vpn_commands[data]) + self.epilogue -            print 'writing...' -            print msg -            for line in vpn_commands[data]: -                self.sock.reads.append(line) -                #telnetlib.Telnet.write(self, line) -            self.sock.reads.append(self.epilogue) -            #telnetlib.Telnet.write(self, self.epilogue) - - -def test_telnet(reads=[], cls=VPNManagementStub): -    ''' return a telnetlib.Telnet object that uses a SocketStub with -        reads queued up to be read, and write method mocking a vpn -        management interface''' -    for x in reads: -        assert type(x) is bytes, x -    with test_socket(reads): -        telnet = cls('dummy', 0) -        telnet._messages = ''  # debuglevel output -    return telnet - - -class ReadTests(TestCase): -    def setUp(self): -        self.old_select = select.select -        select.select = new_select - -    def tearDown(self): -        select.select = self.old_select - -    def test_read_until(self): -        """ -        read_until(expected, timeout=None) -        test the blocking version of read_util -        """ -        want = [b'xxxmatchyyy'] -        telnet = test_telnet(want) -        data = telnet.read_until(b'match') -        self.assertEqual(data, b'xxxmatch', msg=(telnet.cookedq, -            telnet.rawq, telnet.sock.reads)) - -        reads = [b'x' * 50, b'match', b'y' * 50] -        expect = b''.join(reads[:-1]) -        telnet = test_telnet(reads) -        data = telnet.read_until(b'match') -        self.assertEqual(data, expect) - -    def test_read_all(self): -        """ -        read_all() -          Read all data until EOF; may block. -        """ -        reads = [b'x' * 500, b'y' * 500, b'z' * 500] -        expect = b''.join(reads) -        telnet = test_telnet(reads) -        data = telnet.read_all() -        self.assertEqual(data, expect) -        return - -    def test_read_some(self): -        """ -        read_some() -          Read at least one byte or EOF; may block. -        """ -        # test 'at least one byte' -        telnet = test_telnet([b'x' * 500]) -        data = telnet.read_some() -        self.assertTrue(len(data) >= 1) -        # test EOF -        telnet = test_telnet() -        data = telnet.read_some() -        self.assertEqual(b'', data) - -    def _read_eager(self, func_name): -        """ -        read_*_eager() -          Read all data available already queued or on the socket, -          without blocking. -        """ -        want = b'x' * 100 -        telnet = test_telnet([want]) -        func = getattr(telnet, func_name) -        telnet.sock.block = True -        self.assertEqual(b'', func()) -        telnet.sock.block = False -        data = b'' -        while True: -            try: -                data += func() -            except EOFError: -                break -        self.assertEqual(data, want) - -    def test_read_eager(self): -        # read_eager and read_very_eager make the same gaurantees -        # (they behave differently but we only test the gaurantees) -        self._read_eager('read_eager') -        self._read_eager('read_very_eager') -        #self._read_eager('read_very_lazy') -        # NB -- we need to test the IAC block which is mentioned in the -        # docstring but not in the module docs - -    def read_very_lazy(self): -        want = b'x' * 100 -        telnet = test_telnet([want]) -        self.assertEqual(b'', telnet.read_very_lazy()) -        while telnet.sock.reads: -            telnet.fill_rawq() -        data = telnet.read_very_lazy() -        self.assertEqual(want, data) -        self.assertRaises(EOFError, telnet.read_very_lazy) - -    def test_read_lazy(self): -        want = b'x' * 100 -        telnet = test_telnet([want]) -        self.assertEqual(b'', telnet.read_lazy()) -        data = b'' -        while True: -            try: -                read_data = telnet.read_lazy() -                data += read_data -                if not read_data: -                    telnet.fill_rawq() -            except EOFError: -                break -            self.assertTrue(want.startswith(data)) -        self.assertEqual(data, want) - - -def _seek_to_eof(self): -    """ -    Read as much as available. Position seek pointer to end of stream -    """ -    #import ipdb;ipdb.set_trace() -    while self.tn.sock.reads: -        print 'reading...' -        print 'and filling rawq' -        self.tn.fill_rawq() -        self.tn.process_rawq() -    try: -        b = self.tn.read_eager() -        while b: -            b = self.tn.read_eager() -    except EOFError: -        pass - - -def connect_to_stub(self): -    """ -    stub to be added to manager -    """ -    try: -        self.close() -    except: -        pass -    if self.connected(): -        return True -    self.tn = test_telnet() - -    self._seek_to_eof() -    return True - - - -class VPNManagerTests(TestCase): - -    def setUp(self): -        self.old_select = select.select -        select.select = new_select - -        patched_manager = OpenVPNManager -        patched_manager._seek_to_eof = _seek_to_eof -        patched_manager.connect = connect_to_stub -        self.manager = patched_manager() - -    def tearDown(self): -        select.select = self.old_select - -    # tests - -     -    #def test_read_very_lazy(self): -        #want = b'x' * 100 -        #telnet = test_telnet() -        #self.assertEqual(b'', telnet.read_very_lazy()) -        #print 'writing to telnet' -        #telnet.write('status\n') -        #import ipdb;ipdb.set_trace() -        #while telnet.sock.reads: -            #print 'reading...' -            #print 'and filling rawq' -            #telnet.fill_rawq() -        #import ipdb;ipdb.set_trace() -        #data = telnet.read_very_lazy() -        #print 'data ->', data - -    #def test_manager_status(self): -        #buf = self.manager._send_command('state') -        #import ipdb;ipdb.set_trace() -        #print 'buf-->' -        #print buf -# -    def test_manager_state(self): -        buf = self.manager.state() -        print 'buf-->' -        print buf -        import ipdb;ipdb.set_trace() - -    def test_command(self): -        commands = [b'status'] -        for com in commands: -            telnet = test_telnet() -            telnet.write(com) -            buf = telnet.read_until(b'END') -            print 'buf ' -            print buf - - -def test_main(verbose=None): -    support.run_unittest( -            #ReadTests, -            VPNManagerTests) - -if __name__ == '__main__': -    test_main() | 
