diff options
Diffstat (limited to 'test/api_tests.py')
-rw-r--r-- | test/api_tests.py | 524 |
1 files changed, 524 insertions, 0 deletions
diff --git a/test/api_tests.py b/test/api_tests.py new file mode 100644 index 0000000..78f7c7f --- /dev/null +++ b/test/api_tests.py @@ -0,0 +1,524 @@ +#!/usr/bin/env python +import testsupport +import os, string, sys, types, unittest, weakref +import sqlite + +class DBAPICompliance(unittest.TestCase): + def CheckAPILevel(self): + self.assertEqual(sqlite.apilevel, '2.0', + 'apilevel is %s, should be 2.0' % sqlite.apilevel) + + def CheckThreadSafety(self): + self.assertEqual(sqlite.threadsafety, 1, + 'threadsafety is %d, should be 1' % sqlite.threadsafety) + + def CheckParamStyle(self): + self.assertEqual(sqlite.paramstyle, 'pyformat', + 'paramstyle is "%s", should be "pyformat"' % + sqlite.paramstyle) + + def CheckWarning(self): + self.assert_(issubclass(sqlite.Warning, StandardError), + 'Warning is not a subclass of StandardError') + + def CheckError(self): + self.failUnless(issubclass(sqlite.Error, StandardError), + 'Error is not a subclass of StandardError') + + def CheckInterfaceError(self): + self.failUnless(issubclass(sqlite.InterfaceError, sqlite.Error), + 'InterfaceError is not a subclass of Error') + + def CheckDatabaseError(self): + self.failUnless(issubclass(sqlite.DatabaseError, sqlite.Error), + 'DatabaseError is not a subclass of Error') + + def CheckDataError(self): + self.failUnless(issubclass(sqlite.DataError, sqlite.DatabaseError), + 'DataError is not a subclass of DatabaseError') + + def CheckOperationalError(self): + self.failUnless(issubclass(sqlite.OperationalError, sqlite.DatabaseError), + 'OperationalError is not a subclass of DatabaseError') + + def CheckIntegrityError(self): + self.failUnless(issubclass(sqlite.IntegrityError, sqlite.DatabaseError), + 'IntegrityError is not a subclass of DatabaseError') + + def CheckInternalError(self): + self.failUnless(issubclass(sqlite.InternalError, sqlite.DatabaseError), + 'InternalError is not a subclass of DatabaseError') + + def CheckProgrammingError(self): + self.failUnless(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError), + 'ProgrammingError is not a subclass of DatabaseError') + + def CheckNotSupportedError(self): + self.failUnless(issubclass(sqlite.NotSupportedError, + sqlite.DatabaseError), + 'NotSupportedError is not a subclass of DatabaseError') + +class moduleTestCases(unittest.TestCase, testsupport.TestSupport): + def setUp(self): + self.filename = self.getfilename() + self.cnx = sqlite.connect(self.filename) + self.cur = self.cnx.cursor() + + def tearDown(self): + try: + self.cnx.close() + self.removefile() + except AttributeError: + pass + except sqlite.ProgrammingError: + pass + + def CheckConnectionObject(self): + self.assert_(isinstance(self.cnx, sqlite.Connection), + 'sqlite.connect did not return a Connection object') + + def CheckConnectionClose(self): + self.assert_(hasattr(self.cnx, 'close') and + type(self.cnx.close) == types.MethodType, + 'close is not a method of Connection') + self.cnx.close() + self.removefile() + self.failUnlessRaises(sqlite.ProgrammingError, self.cnx.close) + + def CheckConnectionCommit(self): + self.assert_(hasattr(self.cnx, "commit") and + type(self.cnx.commit) == types.MethodType, + 'commit is not a method of Connection') + self.cnx.close() + self.removefile() + self.failUnlessRaises(sqlite.ProgrammingError, self.cnx.commit) + + def CheckConnectionRollback(self): + self.assert_(hasattr(self.cnx, "rollback") and + type(self.cnx.rollback) == types.MethodType, + 'rollback is not a method of Connection') + self.cnx.close() + self.removefile() + self.failUnlessRaises(sqlite.ProgrammingError, self.cnx.rollback) + + def CheckConnectionCursor(self): + self.assert_(hasattr(self.cnx, "cursor") and + type(self.cnx.cursor) == types.MethodType, + 'cursor is not a method of Connection') + self.cnx.close() + self.removefile() + self.failUnlessRaises(sqlite.ProgrammingError, self.cnx.cursor) + + def CheckCloseConnection(self): + self.cnx.close() + self.removefile() + + def CheckCursorObject(self): + self.assert_(isinstance(self.cur, sqlite.Cursor), + 'cnx.cursor() did not return a Cursor instance') + + def CheckCursorArraysize(self): + self.assert_(self.cur.arraysize == 1, + 'cur.arraysize is %d, it should be 1' % + self.cur.arraysize) + + def CheckCursorDescription(self): + self.assert_(self.cur.description == None, + "cur.description should be None at this point, it isn't.") + + def CheckCursorDescriptionNoRow(self): + """ + cursor.description should at least provide the column name(s), even if + no row returned. + """ + self.cur.execute("create table test(a, b)") + self.cur.execute("select a, b from test") + self.assert_(self.cur.description[0][0] == "a") + self.assert_(self.cur.description[1][0] == "b") + + def CheckCursorRowcount(self): + self.assert_(self.cur.rowcount == -1, + 'cur.rowcount is %d, should be -1' % self.cur.rowcount) + + def CheckCursorClose(self): + self.assert_(hasattr(self.cur, "close") and + type(self.cur.close) == types.MethodType, + 'close is not a method of the Cursor object') + self.cur.close() + self.failUnlessRaises(sqlite.ProgrammingError, self.cur.close) + + def CheckCursorExecute(self): + self.assert_(hasattr(self.cur, "execute") and + type(self.cur.execute) == types.MethodType, + 'execute is not a method of the Cursor object') + self.cur.close() + self.failUnlessRaises(sqlite.ProgrammingError, + self.cur.execute, 'SELECT max(3,4)') + + def CheckCursorExecutemany(self): + self.assert_(hasattr(self.cur, "executemany") and + type(self.cur.executemany) == types.MethodType, + 'executemany is not a method of the Cursor object') + + self.cur.close() + self.failUnlessRaises(sqlite.ProgrammingError, + self.cur.executemany, 'SELECT max(3,4)', [1,2]) + + def CheckCursorFetchone(self): + self.assert_(hasattr(self.cur, "fetchone") and + type(self.cur.fetchone) == types.MethodType, + 'fetchone is not a method of the Cursor object') + self.cur.close() + self.failUnlessRaises(sqlite.ProgrammingError, self.cur.fetchone) + + def CheckCursorFetchMany(self): + self.failUnless(hasattr(self.cur, "fetchmany") and + type(self.cur.fetchmany) == types.MethodType, + 'fetchmany is not a method of the Cursor object') + + cursor = self.cnx.cursor() + cursor.execute("create table test(id int)") + cursor.executemany("insert into test(id) values (%s)", range(10)) + cursor.execute("select id from test") + res = cursor.fetchmany() + self.failUnlessEqual(len(res), 1, """fetchmany should have returned a + list of length 1, but the list was %i elements long""" % len(res)) + res = cursor.fetchmany(2) + self.failUnlessEqual(len(res), 2, """fetchmany should have returned a + list of length 2, but the list was %i elements long""" % len(res)) + cursor.arraysize = 5 + res = cursor.fetchmany() + self.failUnlessEqual(len(res), 5, """fetchmany should have returned a + list of length 5, but the list was %i elements long""" % len(res)) + + self.cur.close() + self.failUnlessRaises(sqlite.ProgrammingError, + self.cur.fetchmany, 10) + + def CheckCursorFetchall(self): + self.failUnless(hasattr(self.cur, "fetchall") and + type(self.cur.fetchall) == types.MethodType, + 'fetchall is not a method of the Cursor object') + self.cur.close() + self.failUnlessRaises(sqlite.ProgrammingError, + self.cur.fetchall) + + def CheckCursorSetoutputsize(self): + self.failUnless(hasattr(self.cur, "setoutputsize") and + type(self.cur.setoutputsize) == types.MethodType, + 'setoutputsize is not a method of the Cursor object') + self.cur.close() + self.failUnlessRaises(sqlite.ProgrammingError, + self.cur.setoutputsize, 1024) + + def CheckCursorSetinputsizes(self): + self.failUnless(hasattr(self.cur, "setinputsizes") and + type(self.cur.setinputsizes) == types.MethodType, + 'setinputsizes is not a method of the Cursor object') + self.cur.close() + self.failUnlessRaises(sqlite.ProgrammingError, + self.cur.setinputsizes, [1, 2, 3]) + + def CheckExecuteWithSingleton(self): + """Test execute() with a singleton string as the parameter.""" + try: + self.cur.execute("select max(3,4)") + except StandardError, msg: + self.fail(msg) + + self.assertEqual(type(self.cur.description), types.TupleType, + "cur.description should be a tuple, but isn't.") + + clen = len(self.cur.description) + self.assertEqual(clen, 1, + "Length of cur.description is %d, it should be %d." % + (clen, 1)) + + + self.assertEqual(len(self.cur.description[0]), 7, + "Length of cur.description[0] is %d, it should be 7." % + len(self.cur.description[0])) + + + self.failUnless(self.cur.description[0][0] == "max(3,4)" and + self.cur.description[0][1] == sqlite.NUMBER and + self.cur.description[0][2] == None and + self.cur.description[0][3] == None and + self.cur.description[0][4] == None and + self.cur.description[0][5] == None and + self.cur.description[0][6] == None, + "cur.description[0] does not match the query.") + self.cur.close() + + def CheckExecuteWithTuple(self): + """Test execute() with a tuple as the parameter.""" + try: + self.cur.execute("select max(%s, %s)", (4, 5)) + except StandardError, msg: + self.fail(msg) + + # Empty tuple + try: + self.cur.execute("select 3+4", ()) + except StandardError, msg: + self.fail(msg) + self.cur.close() + + def CheckExecuteWithDictionary(self): + """Test execute() with a dictionary as the parameter.""" + try: + self.cur.execute("select max(%(n1)s, %(n2)s)", {"n1": 5, "n2": 6}) + except StandardError, msg: + self.fail(msg) + self.cur.close() + + def CheckQuotingOfLong(self): + """Test wether longs are quoted properly for SQL.""" + try: + self.cur.execute("-- types long") + self.cur.execute("select %s + %s as x", (5L, 6L)) + except StandardError, msg: + self.fail(msg) + res = self.cur.fetchone() + self.failUnlessEqual(res.x, 11L, + "The addition of long should have returned %i, returned %i" + % (11L, res.x)) + + def CheckCursorIterator(self): + self.cur.execute("create table test (id, name)") + self.cur.executemany("insert into test (id) values (%s)", + [(1,), (2,), (3,)]) + self.cur.execute("-- types int") + self.cur.execute("select id from test") + + if sys.version_info[:2] >= (2,2): + counter = 0 + for row in self.cur: + if counter == 0: + self.failUnlessEqual(row.id, 1, + "row.id should have been 1, was %i" % row.id) + elif counter == 1: + self.failUnlessEqual(row.id, 2, + "row.id should have been 2, was %i" % row.id) + elif counter == 2: + self.failUnlessEqual(row.id, 3, + "row.id should have been 3, was %i" % row.id) + else: + self.fail("Iterated over too many rows.") + counter += 1 + else: + # Python 2.1 + counter = 0 + try: + while 1: + row = self.cur.next() + if counter == 0: + self.failUnlessEqual(row.id, 1, + "row.id should have been 1, was %i" % row.id) + elif counter == 1: + self.failUnlessEqual(row.id, 2, + "row.id should have been 2, was %i" % row.id) + elif counter == 2: + self.failUnlessEqual(row.id, 3, + "row.id should have been 3, was %i" % row.id) + else: + self.fail("Iterated over too many rows.") + counter += 1 + except IndexError: + pass + self.failUnlessEqual(counter, 3, + "Should have iterated over 3 items, was: %i" % counter) + + def CheckCursorScrollAndRownumber(self): + self.cur.execute("create table test (id, name)") + values = [("foo",)] * 20 + self.cur.executemany("insert into test (name) values (%s)", values) + self.cur.execute("select name from test") + self.failUnlessEqual(self.cur.rownumber, 0, + "Directly after execute, rownumber must be 0, is: %i" + % self.cur.rownumber) + + self.cur.scroll(1, "absolute") + self.cur.scroll(5, "absolute") + self.failUnlessEqual(self.cur.rownumber, 5, + "rownumber should be 5, is: %i" + % self.cur.rownumber) + + self.cur.scroll(1, "relative") + self.failUnlessEqual(self.cur.rownumber, 6, + "rownumber should be 6, is: %i" + % self.cur.rownumber) + + self.cur.scroll(-2, "relative") + self.failUnlessEqual(self.cur.rownumber, 4, + "rownumber should be 4, is: %i" + % self.cur.rownumber) + + self.failUnlessRaises(IndexError, self.cur.scroll, -2, "absolute") + self.failUnlessRaises(IndexError, self.cur.scroll, 1000, "absolute") + + self.cur.scroll(10, "absolute") + self.failUnlessRaises(IndexError, self.cur.scroll, -11, "relative") + + self.cur.scroll(10, "absolute") + self.failUnlessRaises(IndexError, self.cur.scroll, 30, "relative") + + def CheckCursorConnection(self): + if not isinstance(self.cur.connection, weakref.ProxyType) and \ + not isinstance(self.cur.connection, weakref.CallableProxyType): + fail("cursor.connection doesn't return the correct type") + + def CheckCursorLastRowID(self): + self.cur.execute("create table test (id integer primary key, name)") + + self.cur.execute("insert into test(name) values ('foo')") + self.failUnlessEqual(self.cur.lastrowid, 1, + "lastrowid should be 1, is %i" % self.cur.lastrowid) + + self.cur.execute("insert into test(name) values ('foo')") + self.failUnlessEqual(self.cur.lastrowid, 2, + "lastrowid should be 2, is %i" % self.cur.lastrowid) + + def CheckResultObject(self): + try: + self.cur.execute("select max(3,4)") + self.assertEqual(self.cur.rowcount, 1, + "cur.rowcount is %d, it should be 1." % + self.cur.rowcount) + self.res = self.cur.fetchall() + except StandardError, msg: + self.fail(msg) + + self.assertEqual(type(self.res), types.ListType, + 'cur.fetchall() did not return a sequence.') + + self.assertEqual(len(self.res), 1, + 'Length of the list of results is %d, it should be 1' % + len(self.res)) + + self.failUnless(isinstance(self.res[0], sqlite.PgResultSet), + 'cur.fetchall() did not return a list of PgResultSets.') + + def CheckResultFetchone(self): + try: + self.cur.execute("select max(3,4)") + self.res = self.cur.fetchone() + self.assertEqual(self.cur.rowcount, 1, + 'cur.rowcount is %d, it should be 1.' % + self.cur.rowcount) + except StandardError, msg: + self.fail(msg) + + self.failUnless(isinstance(self.res, sqlite.PgResultSet), + "cur.fetchone() does not return a PgResultSet.") + + try: + self.res = self.cur.fetchone() + self.assertEqual(self.res, None, + "res should be None at this point, but it isn't.") + except StandardError, msg: + self.fail(msg) + + def CheckRowCountAfterInsert(self): + try: + self.cur.execute("create table test(a)") + self.cur.execute("insert into test(a) values (5)") + self.assertEqual(self.cur.rowcount, 1, + 'cur.rowcount is %d, it should be 1.' % + self.cur.rowcount) + except StandardError, msg: + self.fail(msg) + + def CheckRowCountAfterUpdate(self): + try: + self.cur.execute("create table test(a, b)") + self.cur.execute("insert into test(a, b) values (1, 2)") + self.cur.execute("insert into test(a, b) values (1, 3)") + self.cur.execute("insert into test(a, b) values (1, 4)") + self.cur.execute("update test set b=1 where a=1") + self.assertEqual(self.cur.rowcount, 3, + 'cur.rowcount is %d, it should be 3.' % + self.cur.rowcount) + except StandardError, msg: + self.fail(msg) + + def CheckRowCountAfterDelete(self): + try: + self.cur.execute("create table test(a, b)") + self.cur.execute("insert into test(a, b) values (1, 2)") + self.cur.execute("insert into test(a, b) values (1, 3)") + self.cur.execute("insert into test(a, b) values (2, 4)") + self.cur.execute("delete from test where a=1") + self.assertEqual(self.cur.rowcount, 2, + 'cur.rowcount is %d, it should be 2.' % + self.cur.rowcount) + except StandardError, msg: + self.fail(msg) + + def CheckSelectOfNonPrintableString(self): + try: + a = '\x01\x02\x03\x04' + self.cur.execute('select %s as a', a) + r = self.cur.fetchone() + self.assertEqual(len(r.a), len(a), + "Length of result is %d, it should be %d." % + (len(r.a), len(a))) + self.failUnless(r.a == a, + "Result is '%s', it should be '%s'" % (r.a, a)) + except StandardError, msg: + self.fail(msg) + + def CheckQuotingIntWithPercentS(self): + try: + self.cur.execute("create table test(a number)") + self.cur.execute("insert into test(a) values (%s)", (5,)) + except StandardError, msg: + self.fail(msg) + + def CheckQuotingLongWithPercentS(self): + try: + self.cur.execute("create table test(a number)") + self.cur.execute("insert into test(a) values (%s)", (50000000L,)) + except StandardError, msg: + self.fail(msg) + + def CheckQuotingFloatWithPercentS(self): + try: + self.cur.execute("create table test(a number)") + self.cur.execute("insert into test(a) values (%s)", (-3.24,)) + except StandardError, msg: + self.fail(msg) + + def CheckQuotingIntWithPyQuoting(self): + try: + self.cur.execute("create table test(a number)") + self.cur.execute("insert into test(a) values (%i)", (5,)) + except StandardError, msg: + self.fail(msg) + + def CheckQuotingLongWithPyQuoting(self): + try: + self.cur.execute("create table test(a number)") + self.cur.execute("insert into test(a) values (%i)", (50000000L,)) + except StandardError, msg: + self.fail(msg) + + def CheckQuotingFloatWithPyQuoting(self): + try: + self.cur.execute("create table test(a number)") + self.cur.execute("insert into test(a) values (%f)", (-3.24,)) + except StandardError, msg: + self.fail(msg) + +def suite(): + dbapi_suite = unittest.makeSuite(DBAPICompliance, "Check") + module_suite = unittest.makeSuite(moduleTestCases, "Check") + test_suite = unittest.TestSuite((dbapi_suite, module_suite)) + return test_suite + +def main(): + runner = unittest.TextTestRunner() + runner.run(suite()) + +if __name__ == "__main__": + main() |