summaryrefslogtreecommitdiff
path: root/tests/test_mgminterface.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_mgminterface.py')
-rw-r--r--tests/test_mgminterface.py333
1 files changed, 333 insertions, 0 deletions
diff --git a/tests/test_mgminterface.py b/tests/test_mgminterface.py
new file mode 100644
index 00000000..398b7f36
--- /dev/null
+++ b/tests/test_mgminterface.py
@@ -0,0 +1,333 @@
+import socket
+import select
+import telnetlib
+import contextlib
+
+from unittest import TestCase
+
+import support
+
+from eip_client.vpnmanager import OpenVPNManager
+
+HOST = "localhost"
+
+
+class SocketStub(object):
+ ''' a socket proxy that re-defines sendall() '''
+ def __init__(self, reads=[]):
+ self.reads = reads
+ self.writes = []
+ self.block = False
+
+ def sendall(self, data):
+ self.writes.append(data)
+
+ def recv(self, size):
+ out = b''
+ while self.reads and len(out) < size:
+ out += self.reads.pop(0)
+ #print(out)
+ if len(out) > size:
+ self.reads.insert(0, out[size:])
+ out = out[:size]
+ return out
+
+
+class TelnetAlike(telnetlib.Telnet):
+ def fileno(self):
+ raise NotImplementedError()
+
+ def close(self):
+ pass
+
+ def sock_avail(self):
+ return (not self.sock.block)
+
+ def msg(self, msg, *args):
+ with support.captured_stdout() as out:
+ telnetlib.Telnet.msg(self, msg, *args)
+ self._messages += out.getvalue()
+ return
+
+ def read_very_lazy(self):
+ self.fill_rawq()
+ _all = self.read_all()
+ print 'faking lazy:', _all
+ return _all
+
+
+def new_select(*s_args):
+ block = False
+ for l in s_args:
+ for fob in l:
+ if isinstance(fob, TelnetAlike):
+ block = fob.sock.block
+ if block:
+ return [[], [], []]
+ else:
+ return s_args
+
+
+@contextlib.contextmanager
+def test_socket(reads):
+ def new_conn(*ignored):
+ return SocketStub(reads)
+ try:
+ old_conn = socket.create_connection
+ socket.create_connection = new_conn
+ yield None
+ finally:
+ socket.create_connection = old_conn
+ return
+
+
+#
+# VPN Commands Dict
+#
+
+vpn_commands = {
+ 'status': [
+ 'OpenVPN STATISTICS', 'Updated,Mon Jun 25 11:51:21 2012',
+ 'TUN/TAP read bytes,306170', 'TUN/TAP write bytes,872102',
+ 'TCP/UDP read bytes,986177', 'TCP/UDP write bytes,439329',
+ 'Auth read bytes,872102'],
+ 'state': ['1340616463,CONNECTED,SUCCESS,172.28.0.2,198.252.153.38'],
+ # XXX add more tests
+ }
+
+
+class VPNManagementStub(TelnetAlike):
+ epilogue = "\nEND\n"
+
+ def write(self, data):
+ #print('data written')
+ data = data[:-1]
+ if data not in vpn_commands:
+ print('not in commands')
+ telnetlib.Telnet.write(self, data)
+ else:
+ msg = '\n'.join(vpn_commands[data]) + self.epilogue
+ print 'writing...'
+ print msg
+ for line in vpn_commands[data]:
+ self.sock.reads.append(line)
+ #telnetlib.Telnet.write(self, line)
+ self.sock.reads.append(self.epilogue)
+ #telnetlib.Telnet.write(self, self.epilogue)
+
+
+def test_telnet(reads=[], cls=VPNManagementStub):
+ ''' return a telnetlib.Telnet object that uses a SocketStub with
+ reads queued up to be read, and write method mocking a vpn
+ management interface'''
+ for x in reads:
+ assert type(x) is bytes, x
+ with test_socket(reads):
+ telnet = cls('dummy', 0)
+ telnet._messages = '' # debuglevel output
+ return telnet
+
+
+class ReadTests(TestCase):
+ def setUp(self):
+ self.old_select = select.select
+ select.select = new_select
+
+ def tearDown(self):
+ select.select = self.old_select
+
+ def test_read_until(self):
+ """
+ read_until(expected, timeout=None)
+ test the blocking version of read_util
+ """
+ want = [b'xxxmatchyyy']
+ telnet = test_telnet(want)
+ data = telnet.read_until(b'match')
+ self.assertEqual(data, b'xxxmatch', msg=(telnet.cookedq,
+ telnet.rawq, telnet.sock.reads))
+
+ reads = [b'x' * 50, b'match', b'y' * 50]
+ expect = b''.join(reads[:-1])
+ telnet = test_telnet(reads)
+ data = telnet.read_until(b'match')
+ self.assertEqual(data, expect)
+
+ def test_read_all(self):
+ """
+ read_all()
+ Read all data until EOF; may block.
+ """
+ reads = [b'x' * 500, b'y' * 500, b'z' * 500]
+ expect = b''.join(reads)
+ telnet = test_telnet(reads)
+ data = telnet.read_all()
+ self.assertEqual(data, expect)
+ return
+
+ def test_read_some(self):
+ """
+ read_some()
+ Read at least one byte or EOF; may block.
+ """
+ # test 'at least one byte'
+ telnet = test_telnet([b'x' * 500])
+ data = telnet.read_some()
+ self.assertTrue(len(data) >= 1)
+ # test EOF
+ telnet = test_telnet()
+ data = telnet.read_some()
+ self.assertEqual(b'', data)
+
+ def _read_eager(self, func_name):
+ """
+ read_*_eager()
+ Read all data available already queued or on the socket,
+ without blocking.
+ """
+ want = b'x' * 100
+ telnet = test_telnet([want])
+ func = getattr(telnet, func_name)
+ telnet.sock.block = True
+ self.assertEqual(b'', func())
+ telnet.sock.block = False
+ data = b''
+ while True:
+ try:
+ data += func()
+ except EOFError:
+ break
+ self.assertEqual(data, want)
+
+ def test_read_eager(self):
+ # read_eager and read_very_eager make the same gaurantees
+ # (they behave differently but we only test the gaurantees)
+ self._read_eager('read_eager')
+ self._read_eager('read_very_eager')
+ #self._read_eager('read_very_lazy')
+ # NB -- we need to test the IAC block which is mentioned in the
+ # docstring but not in the module docs
+
+ def read_very_lazy(self):
+ want = b'x' * 100
+ telnet = test_telnet([want])
+ self.assertEqual(b'', telnet.read_very_lazy())
+ while telnet.sock.reads:
+ telnet.fill_rawq()
+ data = telnet.read_very_lazy()
+ self.assertEqual(want, data)
+ self.assertRaises(EOFError, telnet.read_very_lazy)
+
+ def test_read_lazy(self):
+ want = b'x' * 100
+ telnet = test_telnet([want])
+ self.assertEqual(b'', telnet.read_lazy())
+ data = b''
+ while True:
+ try:
+ read_data = telnet.read_lazy()
+ data += read_data
+ if not read_data:
+ telnet.fill_rawq()
+ except EOFError:
+ break
+ self.assertTrue(want.startswith(data))
+ self.assertEqual(data, want)
+
+
+def _seek_to_eof(self):
+ """
+ Read as much as available. Position seek pointer to end of stream
+ """
+ #import ipdb;ipdb.set_trace()
+ while self.tn.sock.reads:
+ print 'reading...'
+ print 'and filling rawq'
+ self.tn.fill_rawq()
+ self.tn.process_rawq()
+ try:
+ b = self.tn.read_eager()
+ while b:
+ b = self.tn.read_eager()
+ except EOFError:
+ pass
+
+
+def connect_to_stub(self):
+ """
+ stub to be added to manager
+ """
+ try:
+ self.close()
+ except:
+ pass
+ if self.connected():
+ return True
+ self.tn = test_telnet()
+
+ self._seek_to_eof()
+ return True
+
+
+
+class VPNManagerTests(TestCase):
+
+ def setUp(self):
+ self.old_select = select.select
+ select.select = new_select
+
+ patched_manager = OpenVPNManager
+ patched_manager._seek_to_eof = _seek_to_eof
+ patched_manager.connect = connect_to_stub
+ self.manager = patched_manager()
+
+ def tearDown(self):
+ select.select = self.old_select
+
+ # tests
+
+
+ #def test_read_very_lazy(self):
+ #want = b'x' * 100
+ #telnet = test_telnet()
+ #self.assertEqual(b'', telnet.read_very_lazy())
+ #print 'writing to telnet'
+ #telnet.write('status\n')
+ #import ipdb;ipdb.set_trace()
+ #while telnet.sock.reads:
+ #print 'reading...'
+ #print 'and filling rawq'
+ #telnet.fill_rawq()
+ #import ipdb;ipdb.set_trace()
+ #data = telnet.read_very_lazy()
+ #print 'data ->', data
+
+ #def test_manager_status(self):
+ #buf = self.manager._send_command('state')
+ #import ipdb;ipdb.set_trace()
+ #print 'buf-->'
+ #print buf
+#
+ def test_manager_state(self):
+ buf = self.manager.state()
+ print 'buf-->'
+ print buf
+ import ipdb;ipdb.set_trace()
+
+ def test_command(self):
+ commands = [b'status']
+ for com in commands:
+ telnet = test_telnet()
+ telnet.write(com)
+ buf = telnet.read_until(b'END')
+ print 'buf '
+ print buf
+
+
+def test_main(verbose=None):
+ support.run_unittest(
+ #ReadTests,
+ VPNManagerTests)
+
+if __name__ == '__main__':
+ test_main()