From 79d16d72cd530acbee682ebee44d5b1d2010c661 Mon Sep 17 00:00:00 2001 From: Micah Anderson Date: Sun, 17 Nov 2013 17:25:01 -0500 Subject: initial import of debian package to build with autobuilder --- test/all_tests.py | 23 ++ test/api_tests.py | 524 +++++++++++++++++++++++++++++++++++++++++++++ test/logging_tests.py | 80 +++++++ test/lowlevel_tests.py | 162 ++++++++++++++ test/pgresultset_tests.py | 202 +++++++++++++++++ test/testsupport.py | 14 ++ test/transaction_tests.py | 106 +++++++++ test/type_tests.py | 342 +++++++++++++++++++++++++++++ test/userfunction_tests.py | 259 ++++++++++++++++++++++ 9 files changed, 1712 insertions(+) create mode 100644 test/all_tests.py create mode 100644 test/api_tests.py create mode 100644 test/logging_tests.py create mode 100644 test/lowlevel_tests.py create mode 100644 test/pgresultset_tests.py create mode 100644 test/testsupport.py create mode 100644 test/transaction_tests.py create mode 100644 test/type_tests.py create mode 100644 test/userfunction_tests.py (limited to 'test') diff --git a/test/all_tests.py b/test/all_tests.py new file mode 100644 index 0000000..df1a03d --- /dev/null +++ b/test/all_tests.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python +""" +This combines all PySQLite test suites into one big one. +""" + +import unittest, sys +import api_tests, logging_tests, lowlevel_tests, pgresultset_tests, type_tests +import userfunction_tests, transaction_tests + +def suite(): + suite = unittest.TestSuite((lowlevel_tests.suite(), api_tests.suite(), + type_tests.suite(), userfunction_tests.suite(), + transaction_tests.suite(), pgresultset_tests.suite(), + logging_tests.suite())) + + return suite + +def main(): + runner = unittest.TextTestRunner() + runner.run(suite()) + +if __name__ == "__main__": + main() diff --git a/test/api_tests.py b/test/api_tests.py new file mode 100644 index 0000000..78f7c7f --- /dev/null +++ b/test/api_tests.py @@ -0,0 +1,524 @@ +#!/usr/bin/env python +import testsupport +import os, string, sys, types, unittest, weakref +import sqlite + +class DBAPICompliance(unittest.TestCase): + def CheckAPILevel(self): + self.assertEqual(sqlite.apilevel, '2.0', + 'apilevel is %s, should be 2.0' % sqlite.apilevel) + + def CheckThreadSafety(self): + self.assertEqual(sqlite.threadsafety, 1, + 'threadsafety is %d, should be 1' % sqlite.threadsafety) + + def CheckParamStyle(self): + self.assertEqual(sqlite.paramstyle, 'pyformat', + 'paramstyle is "%s", should be "pyformat"' % + sqlite.paramstyle) + + def CheckWarning(self): + self.assert_(issubclass(sqlite.Warning, StandardError), + 'Warning is not a subclass of StandardError') + + def CheckError(self): + self.failUnless(issubclass(sqlite.Error, StandardError), + 'Error is not a subclass of StandardError') + + def CheckInterfaceError(self): + self.failUnless(issubclass(sqlite.InterfaceError, sqlite.Error), + 'InterfaceError is not a subclass of Error') + + def CheckDatabaseError(self): + self.failUnless(issubclass(sqlite.DatabaseError, sqlite.Error), + 'DatabaseError is not a subclass of Error') + + def CheckDataError(self): + self.failUnless(issubclass(sqlite.DataError, sqlite.DatabaseError), + 'DataError is not a subclass of DatabaseError') + + def CheckOperationalError(self): + self.failUnless(issubclass(sqlite.OperationalError, sqlite.DatabaseError), + 'OperationalError is not a subclass of DatabaseError') + + def CheckIntegrityError(self): + self.failUnless(issubclass(sqlite.IntegrityError, sqlite.DatabaseError), + 'IntegrityError is not a subclass of DatabaseError') + + def CheckInternalError(self): + self.failUnless(issubclass(sqlite.InternalError, sqlite.DatabaseError), + 'InternalError is not a subclass of DatabaseError') + + def CheckProgrammingError(self): + self.failUnless(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError), + 'ProgrammingError is not a subclass of DatabaseError') + + def CheckNotSupportedError(self): + self.failUnless(issubclass(sqlite.NotSupportedError, + sqlite.DatabaseError), + 'NotSupportedError is not a subclass of DatabaseError') + +class moduleTestCases(unittest.TestCase, testsupport.TestSupport): + def setUp(self): + self.filename = self.getfilename() + self.cnx = sqlite.connect(self.filename) + self.cur = self.cnx.cursor() + + def tearDown(self): + try: + self.cnx.close() + self.removefile() + except AttributeError: + pass + except sqlite.ProgrammingError: + pass + + def CheckConnectionObject(self): + self.assert_(isinstance(self.cnx, sqlite.Connection), + 'sqlite.connect did not return a Connection object') + + def CheckConnectionClose(self): + self.assert_(hasattr(self.cnx, 'close') and + type(self.cnx.close) == types.MethodType, + 'close is not a method of Connection') + self.cnx.close() + self.removefile() + self.failUnlessRaises(sqlite.ProgrammingError, self.cnx.close) + + def CheckConnectionCommit(self): + self.assert_(hasattr(self.cnx, "commit") and + type(self.cnx.commit) == types.MethodType, + 'commit is not a method of Connection') + self.cnx.close() + self.removefile() + self.failUnlessRaises(sqlite.ProgrammingError, self.cnx.commit) + + def CheckConnectionRollback(self): + self.assert_(hasattr(self.cnx, "rollback") and + type(self.cnx.rollback) == types.MethodType, + 'rollback is not a method of Connection') + self.cnx.close() + self.removefile() + self.failUnlessRaises(sqlite.ProgrammingError, self.cnx.rollback) + + def CheckConnectionCursor(self): + self.assert_(hasattr(self.cnx, "cursor") and + type(self.cnx.cursor) == types.MethodType, + 'cursor is not a method of Connection') + self.cnx.close() + self.removefile() + self.failUnlessRaises(sqlite.ProgrammingError, self.cnx.cursor) + + def CheckCloseConnection(self): + self.cnx.close() + self.removefile() + + def CheckCursorObject(self): + self.assert_(isinstance(self.cur, sqlite.Cursor), + 'cnx.cursor() did not return a Cursor instance') + + def CheckCursorArraysize(self): + self.assert_(self.cur.arraysize == 1, + 'cur.arraysize is %d, it should be 1' % + self.cur.arraysize) + + def CheckCursorDescription(self): + self.assert_(self.cur.description == None, + "cur.description should be None at this point, it isn't.") + + def CheckCursorDescriptionNoRow(self): + """ + cursor.description should at least provide the column name(s), even if + no row returned. + """ + self.cur.execute("create table test(a, b)") + self.cur.execute("select a, b from test") + self.assert_(self.cur.description[0][0] == "a") + self.assert_(self.cur.description[1][0] == "b") + + def CheckCursorRowcount(self): + self.assert_(self.cur.rowcount == -1, + 'cur.rowcount is %d, should be -1' % self.cur.rowcount) + + def CheckCursorClose(self): + self.assert_(hasattr(self.cur, "close") and + type(self.cur.close) == types.MethodType, + 'close is not a method of the Cursor object') + self.cur.close() + self.failUnlessRaises(sqlite.ProgrammingError, self.cur.close) + + def CheckCursorExecute(self): + self.assert_(hasattr(self.cur, "execute") and + type(self.cur.execute) == types.MethodType, + 'execute is not a method of the Cursor object') + self.cur.close() + self.failUnlessRaises(sqlite.ProgrammingError, + self.cur.execute, 'SELECT max(3,4)') + + def CheckCursorExecutemany(self): + self.assert_(hasattr(self.cur, "executemany") and + type(self.cur.executemany) == types.MethodType, + 'executemany is not a method of the Cursor object') + + self.cur.close() + self.failUnlessRaises(sqlite.ProgrammingError, + self.cur.executemany, 'SELECT max(3,4)', [1,2]) + + def CheckCursorFetchone(self): + self.assert_(hasattr(self.cur, "fetchone") and + type(self.cur.fetchone) == types.MethodType, + 'fetchone is not a method of the Cursor object') + self.cur.close() + self.failUnlessRaises(sqlite.ProgrammingError, self.cur.fetchone) + + def CheckCursorFetchMany(self): + self.failUnless(hasattr(self.cur, "fetchmany") and + type(self.cur.fetchmany) == types.MethodType, + 'fetchmany is not a method of the Cursor object') + + cursor = self.cnx.cursor() + cursor.execute("create table test(id int)") + cursor.executemany("insert into test(id) values (%s)", range(10)) + cursor.execute("select id from test") + res = cursor.fetchmany() + self.failUnlessEqual(len(res), 1, """fetchmany should have returned a + list of length 1, but the list was %i elements long""" % len(res)) + res = cursor.fetchmany(2) + self.failUnlessEqual(len(res), 2, """fetchmany should have returned a + list of length 2, but the list was %i elements long""" % len(res)) + cursor.arraysize = 5 + res = cursor.fetchmany() + self.failUnlessEqual(len(res), 5, """fetchmany should have returned a + list of length 5, but the list was %i elements long""" % len(res)) + + self.cur.close() + self.failUnlessRaises(sqlite.ProgrammingError, + self.cur.fetchmany, 10) + + def CheckCursorFetchall(self): + self.failUnless(hasattr(self.cur, "fetchall") and + type(self.cur.fetchall) == types.MethodType, + 'fetchall is not a method of the Cursor object') + self.cur.close() + self.failUnlessRaises(sqlite.ProgrammingError, + self.cur.fetchall) + + def CheckCursorSetoutputsize(self): + self.failUnless(hasattr(self.cur, "setoutputsize") and + type(self.cur.setoutputsize) == types.MethodType, + 'setoutputsize is not a method of the Cursor object') + self.cur.close() + self.failUnlessRaises(sqlite.ProgrammingError, + self.cur.setoutputsize, 1024) + + def CheckCursorSetinputsizes(self): + self.failUnless(hasattr(self.cur, "setinputsizes") and + type(self.cur.setinputsizes) == types.MethodType, + 'setinputsizes is not a method of the Cursor object') + self.cur.close() + self.failUnlessRaises(sqlite.ProgrammingError, + self.cur.setinputsizes, [1, 2, 3]) + + def CheckExecuteWithSingleton(self): + """Test execute() with a singleton string as the parameter.""" + try: + self.cur.execute("select max(3,4)") + except StandardError, msg: + self.fail(msg) + + self.assertEqual(type(self.cur.description), types.TupleType, + "cur.description should be a tuple, but isn't.") + + clen = len(self.cur.description) + self.assertEqual(clen, 1, + "Length of cur.description is %d, it should be %d." % + (clen, 1)) + + + self.assertEqual(len(self.cur.description[0]), 7, + "Length of cur.description[0] is %d, it should be 7." % + len(self.cur.description[0])) + + + self.failUnless(self.cur.description[0][0] == "max(3,4)" and + self.cur.description[0][1] == sqlite.NUMBER and + self.cur.description[0][2] == None and + self.cur.description[0][3] == None and + self.cur.description[0][4] == None and + self.cur.description[0][5] == None and + self.cur.description[0][6] == None, + "cur.description[0] does not match the query.") + self.cur.close() + + def CheckExecuteWithTuple(self): + """Test execute() with a tuple as the parameter.""" + try: + self.cur.execute("select max(%s, %s)", (4, 5)) + except StandardError, msg: + self.fail(msg) + + # Empty tuple + try: + self.cur.execute("select 3+4", ()) + except StandardError, msg: + self.fail(msg) + self.cur.close() + + def CheckExecuteWithDictionary(self): + """Test execute() with a dictionary as the parameter.""" + try: + self.cur.execute("select max(%(n1)s, %(n2)s)", {"n1": 5, "n2": 6}) + except StandardError, msg: + self.fail(msg) + self.cur.close() + + def CheckQuotingOfLong(self): + """Test wether longs are quoted properly for SQL.""" + try: + self.cur.execute("-- types long") + self.cur.execute("select %s + %s as x", (5L, 6L)) + except StandardError, msg: + self.fail(msg) + res = self.cur.fetchone() + self.failUnlessEqual(res.x, 11L, + "The addition of long should have returned %i, returned %i" + % (11L, res.x)) + + def CheckCursorIterator(self): + self.cur.execute("create table test (id, name)") + self.cur.executemany("insert into test (id) values (%s)", + [(1,), (2,), (3,)]) + self.cur.execute("-- types int") + self.cur.execute("select id from test") + + if sys.version_info[:2] >= (2,2): + counter = 0 + for row in self.cur: + if counter == 0: + self.failUnlessEqual(row.id, 1, + "row.id should have been 1, was %i" % row.id) + elif counter == 1: + self.failUnlessEqual(row.id, 2, + "row.id should have been 2, was %i" % row.id) + elif counter == 2: + self.failUnlessEqual(row.id, 3, + "row.id should have been 3, was %i" % row.id) + else: + self.fail("Iterated over too many rows.") + counter += 1 + else: + # Python 2.1 + counter = 0 + try: + while 1: + row = self.cur.next() + if counter == 0: + self.failUnlessEqual(row.id, 1, + "row.id should have been 1, was %i" % row.id) + elif counter == 1: + self.failUnlessEqual(row.id, 2, + "row.id should have been 2, was %i" % row.id) + elif counter == 2: + self.failUnlessEqual(row.id, 3, + "row.id should have been 3, was %i" % row.id) + else: + self.fail("Iterated over too many rows.") + counter += 1 + except IndexError: + pass + self.failUnlessEqual(counter, 3, + "Should have iterated over 3 items, was: %i" % counter) + + def CheckCursorScrollAndRownumber(self): + self.cur.execute("create table test (id, name)") + values = [("foo",)] * 20 + self.cur.executemany("insert into test (name) values (%s)", values) + self.cur.execute("select name from test") + self.failUnlessEqual(self.cur.rownumber, 0, + "Directly after execute, rownumber must be 0, is: %i" + % self.cur.rownumber) + + self.cur.scroll(1, "absolute") + self.cur.scroll(5, "absolute") + self.failUnlessEqual(self.cur.rownumber, 5, + "rownumber should be 5, is: %i" + % self.cur.rownumber) + + self.cur.scroll(1, "relative") + self.failUnlessEqual(self.cur.rownumber, 6, + "rownumber should be 6, is: %i" + % self.cur.rownumber) + + self.cur.scroll(-2, "relative") + self.failUnlessEqual(self.cur.rownumber, 4, + "rownumber should be 4, is: %i" + % self.cur.rownumber) + + self.failUnlessRaises(IndexError, self.cur.scroll, -2, "absolute") + self.failUnlessRaises(IndexError, self.cur.scroll, 1000, "absolute") + + self.cur.scroll(10, "absolute") + self.failUnlessRaises(IndexError, self.cur.scroll, -11, "relative") + + self.cur.scroll(10, "absolute") + self.failUnlessRaises(IndexError, self.cur.scroll, 30, "relative") + + def CheckCursorConnection(self): + if not isinstance(self.cur.connection, weakref.ProxyType) and \ + not isinstance(self.cur.connection, weakref.CallableProxyType): + fail("cursor.connection doesn't return the correct type") + + def CheckCursorLastRowID(self): + self.cur.execute("create table test (id integer primary key, name)") + + self.cur.execute("insert into test(name) values ('foo')") + self.failUnlessEqual(self.cur.lastrowid, 1, + "lastrowid should be 1, is %i" % self.cur.lastrowid) + + self.cur.execute("insert into test(name) values ('foo')") + self.failUnlessEqual(self.cur.lastrowid, 2, + "lastrowid should be 2, is %i" % self.cur.lastrowid) + + def CheckResultObject(self): + try: + self.cur.execute("select max(3,4)") + self.assertEqual(self.cur.rowcount, 1, + "cur.rowcount is %d, it should be 1." % + self.cur.rowcount) + self.res = self.cur.fetchall() + except StandardError, msg: + self.fail(msg) + + self.assertEqual(type(self.res), types.ListType, + 'cur.fetchall() did not return a sequence.') + + self.assertEqual(len(self.res), 1, + 'Length of the list of results is %d, it should be 1' % + len(self.res)) + + self.failUnless(isinstance(self.res[0], sqlite.PgResultSet), + 'cur.fetchall() did not return a list of PgResultSets.') + + def CheckResultFetchone(self): + try: + self.cur.execute("select max(3,4)") + self.res = self.cur.fetchone() + self.assertEqual(self.cur.rowcount, 1, + 'cur.rowcount is %d, it should be 1.' % + self.cur.rowcount) + except StandardError, msg: + self.fail(msg) + + self.failUnless(isinstance(self.res, sqlite.PgResultSet), + "cur.fetchone() does not return a PgResultSet.") + + try: + self.res = self.cur.fetchone() + self.assertEqual(self.res, None, + "res should be None at this point, but it isn't.") + except StandardError, msg: + self.fail(msg) + + def CheckRowCountAfterInsert(self): + try: + self.cur.execute("create table test(a)") + self.cur.execute("insert into test(a) values (5)") + self.assertEqual(self.cur.rowcount, 1, + 'cur.rowcount is %d, it should be 1.' % + self.cur.rowcount) + except StandardError, msg: + self.fail(msg) + + def CheckRowCountAfterUpdate(self): + try: + self.cur.execute("create table test(a, b)") + self.cur.execute("insert into test(a, b) values (1, 2)") + self.cur.execute("insert into test(a, b) values (1, 3)") + self.cur.execute("insert into test(a, b) values (1, 4)") + self.cur.execute("update test set b=1 where a=1") + self.assertEqual(self.cur.rowcount, 3, + 'cur.rowcount is %d, it should be 3.' % + self.cur.rowcount) + except StandardError, msg: + self.fail(msg) + + def CheckRowCountAfterDelete(self): + try: + self.cur.execute("create table test(a, b)") + self.cur.execute("insert into test(a, b) values (1, 2)") + self.cur.execute("insert into test(a, b) values (1, 3)") + self.cur.execute("insert into test(a, b) values (2, 4)") + self.cur.execute("delete from test where a=1") + self.assertEqual(self.cur.rowcount, 2, + 'cur.rowcount is %d, it should be 2.' % + self.cur.rowcount) + except StandardError, msg: + self.fail(msg) + + def CheckSelectOfNonPrintableString(self): + try: + a = '\x01\x02\x03\x04' + self.cur.execute('select %s as a', a) + r = self.cur.fetchone() + self.assertEqual(len(r.a), len(a), + "Length of result is %d, it should be %d." % + (len(r.a), len(a))) + self.failUnless(r.a == a, + "Result is '%s', it should be '%s'" % (r.a, a)) + except StandardError, msg: + self.fail(msg) + + def CheckQuotingIntWithPercentS(self): + try: + self.cur.execute("create table test(a number)") + self.cur.execute("insert into test(a) values (%s)", (5,)) + except StandardError, msg: + self.fail(msg) + + def CheckQuotingLongWithPercentS(self): + try: + self.cur.execute("create table test(a number)") + self.cur.execute("insert into test(a) values (%s)", (50000000L,)) + except StandardError, msg: + self.fail(msg) + + def CheckQuotingFloatWithPercentS(self): + try: + self.cur.execute("create table test(a number)") + self.cur.execute("insert into test(a) values (%s)", (-3.24,)) + except StandardError, msg: + self.fail(msg) + + def CheckQuotingIntWithPyQuoting(self): + try: + self.cur.execute("create table test(a number)") + self.cur.execute("insert into test(a) values (%i)", (5,)) + except StandardError, msg: + self.fail(msg) + + def CheckQuotingLongWithPyQuoting(self): + try: + self.cur.execute("create table test(a number)") + self.cur.execute("insert into test(a) values (%i)", (50000000L,)) + except StandardError, msg: + self.fail(msg) + + def CheckQuotingFloatWithPyQuoting(self): + try: + self.cur.execute("create table test(a number)") + self.cur.execute("insert into test(a) values (%f)", (-3.24,)) + except StandardError, msg: + self.fail(msg) + +def suite(): + dbapi_suite = unittest.makeSuite(DBAPICompliance, "Check") + module_suite = unittest.makeSuite(moduleTestCases, "Check") + test_suite = unittest.TestSuite((dbapi_suite, module_suite)) + return test_suite + +def main(): + runner = unittest.TextTestRunner() + runner.run(suite()) + +if __name__ == "__main__": + main() diff --git a/test/logging_tests.py b/test/logging_tests.py new file mode 100644 index 0000000..ed7bf24 --- /dev/null +++ b/test/logging_tests.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python +import testsupport +import StringIO, unittest +import sqlite + +class LogFileTemplate: + def write(self, s): + pass + +class LogFile: + def __init__(self): + pass + +def init_LogFile(): + LogFile.write = LogFileTemplate.write + +class CommandLoggingTests(unittest.TestCase, testsupport.TestSupport): + def tearDown(self): + try: + self.cnx.close() + self.removefile() + except AttributeError: + pass + except sqlite.InterfaceError: + pass + + def CheckNoWrite(self): + init_LogFile() + del LogFile.write + logger = LogFile() + try: + self.cnx = sqlite.connect(self.getfilename(), + command_logfile=logger) + + self.fail("ValueError not raised") + except ValueError: + pass + + def CheckWriteNotCallable(self): + logger = LogFile() + logger.write = 5 + try: + self.cnx = sqlite.connect(self.getfilename(), + command_logfile=logger) + + self.fail("ValueError not raised") + except ValueError: + pass + + def CheckLoggingWorks(self): + logger = StringIO.StringIO() + + expected_output = "\n".join([ + "BEGIN", "CREATE TABLE TEST(FOO INTEGER)", + "INSERT INTO TEST(FOO) VALUES (5)", + "ROLLBACK"]) + "\n" + + self.cnx = sqlite.connect(self.getfilename(), + command_logfile=logger) + cu = self.cnx.cursor() + cu.execute("CREATE TABLE TEST(FOO INTEGER)") + cu.execute("INSERT INTO TEST(FOO) VALUES (%i)", (5,)) + self.cnx.rollback() + + logger.seek(0) + real_output = logger.read() + + if expected_output != real_output: + self.fail("Logging didn't produce expected output.") + +def suite(): + command_logging_suite = unittest.makeSuite(CommandLoggingTests, "Check") + return command_logging_suite + +def main(): + runner = unittest.TextTestRunner() + runner.run(suite()) + +if __name__ == "__main__": + main() diff --git a/test/lowlevel_tests.py b/test/lowlevel_tests.py new file mode 100644 index 0000000..7a1d537 --- /dev/null +++ b/test/lowlevel_tests.py @@ -0,0 +1,162 @@ +#!/usr/bin/env python +""" +These are the tests for the low-level module _sqlite. + +They try to execute as much of the low-level _sqlite module as possible to +facilitate coverage testing with the help of gcov. +""" + +from __future__ import nested_scopes +import testsupport +import os, unittest, re +import _sqlite +from sqlite import ProgrammingError + +class lowlevelTestCases(unittest.TestCase, testsupport.TestSupport): + def setUp(self): + self.filename = self.getfilename() + self.cnx = _sqlite.connect(self.filename) + + def tearDown(self): + try: + self.cnx.close() + self.removefile() + del self.cnx + except AttributeError: + pass + except ProgrammingError: + pass + + def CheckModuleAttributeAccess(self): + for attr in dir(_sqlite): + _sqlite.__dict__[attr] + + def CheckConnectionAttributes(self): + self.cnx.filename + self.cnx.sql + self.cnx.sqlite_changes() + self.cnx.sqlite_last_insert_rowid() + + try: + self.cnx.foo = 7 + self.fail("Could set attribute. Connection object should be read-only.") + except TypeError: + pass + + def CheckSQLiteExec(self): + self.cnx.execute("create table test(id int, name varchar(20))") + self.cnx.execute("insert into test(id, name) values (1, 'foo')") + self.cnx.execute("insert into test(id, name) values (2, 'bar')") + + expected_colnames = ('id', 'name') + expected_values = [('1', 'foo'), ('2', 'bar')] + failures = [] + + def callback(arg1, items, colnames): + if colnames != expected_colnames: + failures.append("expected colnames %s, got %s" + % (repr(expected_colnames), repr(colnames))) + if items not in expected_values: + failures.append("%s not in expected_values %s" + % (repr(items), repr(expected_values))) + else: + expected_values.pop(0) + + self.cnx.sqlite_exec("select * from test", callback, None) + if len(failures) > 0: + for failure in failures: + self.fail(failure) + + def CheckSQLiteLastInsertRowID(self): + self.cnx.execute("create table test(id integer primary key, name varchar(20))") + self.cnx.execute("insert into test(id, name) values (NULL, 'foo')") + self.cnx.execute("insert into test(id, name) values (NULL, 'bar')") + rowid = self.cnx.sqlite_last_insert_rowid() + self.failUnlessEqual(rowid, 2, + "last inserted rowid should have been %i, was %i" + % (2, rowid)) + + def CheckSQLiteChanges(self): + self.cnx.execute("create table test(id integer primary key, name varchar(20))") + self.cnx.execute("insert into test(id, name) values (NULL, 'foo')") + self.cnx.execute("insert into test(id, name) values (NULL, 'bar')") + self.cnx.execute("insert into test(id, name) values (NULL, 'baz')") + self.cnx.execute("delete from test where name='baz'") + changed = self.cnx.sqlite_changes() + self.failUnlessEqual(changed, 1, + "changed rows should have been %i, was %i" + % (1, changed)) + self.cnx.execute("update test set name='foobar' where id < 10") + changed = self.cnx.sqlite_changes() + self.failUnlessEqual(changed, 2, + "changed rows should have been %i, was %i" + % (2, changed)) + + def CheckConnectionForProgrammingError(self): + self.cnx.close() + self.removefile() + + self.failUnlessRaises(ProgrammingError, self.cnx.close) + self.failUnlessRaises(ProgrammingError, self.cnx.execute, "") + + def CheckConnectionForNumberOfArguments(self): + self.failUnlessRaises(TypeError, self.cnx.close, None) + self.failUnlessRaises(TypeError, self.cnx.execute, None, None) + self.failUnlessRaises(TypeError, self.cnx.sqlite_changes, None) + self.failUnlessRaises(TypeError, self.cnx.sqlite_exec, None) + self.failUnlessRaises(TypeError, self.cnx.sqlite_last_insert_rowid, None) + + def CheckConnectionDestructor(self): + del self.cnx + self.removefile() + + def CheckResultObject(self): + create_statement = "create table test(id INTEGER, name TEXT)" + self.cnx.execute(create_statement) + + self.failUnlessEqual(create_statement, self.cnx.sql, + ".sql should have been %s, was %s" % (create_statement, self.cnx.sql)) + + self.cnx.execute("insert into test(id, name) values (4, 'foo')") + self.cnx.execute("insert into test(id, name) values (5, 'bar')") + + res = self.cnx.execute("select id, name from test") + self.failUnless(res.rowcount == 2, "Should have returned 2 rows, but was %i" % res.rowcount) + + correct_col_defs = (('id', _sqlite.INTEGER, None, None, None, None, None), \ + ('name', _sqlite.STRING, None, None, None, None, None)) + self.assertEqual(res.col_defs, correct_col_defs, + "col_defs should have been %s, was %s" % (repr(correct_col_defs), repr(res.col_defs))) + + correct_row_list = [(4, 'foo'), (5, 'bar')] + self.assertEqual(res.row_list, correct_row_list, + "rowlist should have been %s, was %s" % (repr(correct_row_list), repr(res.row_list))) + + def CheckResultAttributes(self): + res = self.cnx.execute("select NULL, max(4,5)") + try: + res.foo = 7 + + except TypeError: + pass + + def CheckSQLiteVersion(self): + try: + ver = _sqlite.sqlite_version() + except: + self.fail('sqlite_version() failed') + pat = re.compile(r'\d*\.\d*\.\d*') + if not re.match(pat,ver): + self.fail('Incorrect sqlite_version() format, ' + 'should be digits.digits.digits, was %s'%ver) + + +def suite(): + return unittest.makeSuite(lowlevelTestCases, "Check") + +def main(): + runner = unittest.TextTestRunner() + runner.run(suite()) + +if __name__ == "__main__": + main() diff --git a/test/pgresultset_tests.py b/test/pgresultset_tests.py new file mode 100644 index 0000000..81f1d46 --- /dev/null +++ b/test/pgresultset_tests.py @@ -0,0 +1,202 @@ +#!/usr/bin/env python +import testsupport +import os, unittest, sys +import sqlite + +class PgResultSetTests(unittest.TestCase, testsupport.TestSupport): + def setUp(self): + self.filename = self.getfilename() + self.cnx = sqlite.connect(self.filename) + self.cur = self.cnx.cursor() + + def tearDown(self): + try: + self.cnx.close() + self.removefile() + except AttributeError: + pass + except sqlite.ProgrammingError: + pass + + def getResult(self): + try: + self.cur.execute("DROP TABLE TEST") + except sqlite.DatabaseError, reason: + pass + + self.cur.execute("CREATE TABLE TEST (id, name, age)") + self.cur.execute("INSERT INTO TEST (id, name, age) VALUES (%s, %s, %s)", + (5, 'Alice', 29)) + self.cur.execute("-- types int, str, int") + self.cur.execute("SELECT id, name, age FROM TEST") + return self.cur.fetchone() + + def CheckAttributeAccess(self): + res = self.getResult() + if not hasattr(res, "id"): + self.fail("Resultset doesn't have attribute 'id'") + if not hasattr(res, "ID"): + self.fail("Resultset doesn't have attribute 'ID'") + + def CheckAttributeValue(self): + res = self.getResult() + if res.id != 5: + self.fail("id should be 5, is %i" % res.id) + if res.ID != 5: + self.fail("ID should be 5, is %i" % res.ID) + + def CheckKeyAccess(self): + res = self.getResult() + if not "id" in res: + self.fail("Resultset doesn't have item 'id'") + if not "ID" in res: + self.fail("Resultset doesn't have item 'ID'") + + def CheckKeyValue(self): + res = self.getResult() + if res["id"] != 5: + self.fail("id should be 5, is %i" % res.id) + if res["ID"] != 5: + self.fail("ID should be 5, is %i" % res.ID) + + def CheckIndexValue(self): + res = self.getResult() + if res[0] != 5: + self.fail("item 0 should be 5, is %i" % res.id) + + def Check_haskey(self): + res = self.getResult() + if not res.has_key("id"): + self.fail("resultset should have key 'id'") + if not res.has_key("ID"): + self.fail("resultset should have key 'ID'") + if not res.has_key("Id"): + self.fail("resultset should have key 'Id'") + + def Check_len(self): + l = len(self.getResult()) + if l != 3: + self.fail("length of resultset should be 3, is %i", l) + + def Check_keys(self): + res = self.getResult() + if res.keys() != ["id", "name", "age"]: + self.fail("keys() should return %s, returns %s" % + (["id", "name", "age"], res.keys())) + + def Check_values(self): + val = self.getResult().values() + if val != (5, 'Alice', 29): + self.fail("Wrong values(): %s" % val) + + def Check_items(self): + it = self.getResult().items() + if it != [("id", 5), ("name", 'Alice'), ("age", 29)]: + self.fail("Wrong items(): %s" % it) + + def Check_get(self): + res = self.getResult() + v = res.get("id") + if v != 5: + self.fail("Wrong result for get [1]") + + v = res.get("ID") + if v != 5: + self.fail("Wrong result for get [2]") + + v = res.get("asdf") + if v is not None: + self.fail("Wrong result for get [3]") + + v = res.get("asdf", 6) + if v != 6: + self.fail("Wrong result for get [4]") + +class TupleResultTests(unittest.TestCase, testsupport.TestSupport): + def setUp(self): + self.filename = self.getfilename() + self.cnx = sqlite.connect(self.filename) + self.cnx.rowclass = tuple + self.cur = self.cnx.cursor() + + def tearDown(self): + try: + self.cnx.close() + self.removefile() + except AttributeError: + pass + except sqlite.ProgrammingError: + pass + + def getOneResult(self): + try: + self.cur.execute("DROP TABLE TEST") + except sqlite.DatabaseError, reason: + pass + + self.cur.execute("CREATE TABLE TEST (id, name, age)") + self.cur.execute("INSERT INTO TEST (id, name, age) VALUES (%s, %s, %s)", + (5, 'Alice', 29)) + self.cur.execute("-- types int, str, int") + self.cur.execute("SELECT id, name, age FROM TEST") + return self.cur.fetchone() + + def getManyResults(self): + try: + self.cur.execute("DROP TABLE TEST") + except sqlite.DatabaseError, reason: + pass + + self.cur.execute("CREATE TABLE TEST (id, name, age)") + self.cur.execute("INSERT INTO TEST (id, name, age) VALUES (%s, %s, %s)", + (5, 'Alice', 29)) + self.cur.execute("INSERT INTO TEST (id, name, age) VALUES (%s, %s, %s)", + (5, 'Alice', 29)) + self.cur.execute("INSERT INTO TEST (id, name, age) VALUES (%s, %s, %s)", + (5, 'Alice', 29)) + self.cur.execute("-- types int, str, int") + self.cur.execute("SELECT id, name, age FROM TEST") + return self.cur.fetchmany(2) + + def getAllResults(self): + try: + self.cur.execute("DROP TABLE TEST") + except sqlite.DatabaseError, reason: + pass + + self.cur.execute("CREATE TABLE TEST (id, name, age)") + self.cur.execute("INSERT INTO TEST (id, name, age) VALUES (%s, %s, %s)", + (5, 'Alice', 29)) + self.cur.execute("INSERT INTO TEST (id, name, age) VALUES (%s, %s, %s)", + (5, 'Alice', 29)) + self.cur.execute("INSERT INTO TEST (id, name, age) VALUES (%s, %s, %s)", + (5, 'Alice', 29)) + self.cur.execute("-- types int, str, int") + self.cur.execute("SELECT id, name, age FROM TEST") + return self.cur.fetchall() + + def CheckRowTypeIsTupleFetchone(self): + res = self.getOneResult() + self.failUnless(type(res) is tuple, "Result type of row isn't a tuple") + + def CheckRowTypeIsTupleFetchmany(self): + res = self.getManyResults() + self.failUnless(type(res[1]) is tuple, "Result type of row isn't a tuple") + + def CheckRowTypeIsTupleFetchall(self): + res = self.getAllResults() + self.failUnless(type(res[2]) is tuple, "Result type of row isn't a tuple") + +def suite(): + tests = [unittest.makeSuite(PgResultSetTests, "Check"), + unittest.makeSuite(PgResultSetTests, "Check")] + if sys.version_info >= (2,2): + tests.append(unittest.makeSuite(TupleResultTests, "Check")) + return unittest.TestSuite(tests) + +def main(): + runner = unittest.TextTestRunner() + runner.run(suite()) + +if __name__ == "__main__": + main() diff --git a/test/testsupport.py b/test/testsupport.py new file mode 100644 index 0000000..18d9ff4 --- /dev/null +++ b/test/testsupport.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python +import _sqlite +import os, tempfile, unittest + +class TestSupport: + def getfilename(self): + if _sqlite.sqlite_version_info() >= (2, 8, 2): + return ":memory:" + else: + return tempfile.mktemp() + + def removefile(self): + if self.filename != ":memory:": + os.remove(self.filename) diff --git a/test/transaction_tests.py b/test/transaction_tests.py new file mode 100644 index 0000000..57bc70f --- /dev/null +++ b/test/transaction_tests.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python +import testsupport +import os, string, sys, types, unittest +import sqlite + +class TransactionTests(unittest.TestCase, testsupport.TestSupport): + def setUp(self): + self.filename = self.getfilename() + self.cnx = sqlite.connect(self.filename) + self.cur = self.cnx.cursor() + + def tearDown(self): + try: + self.cnx.close() + self.removefile() + except AttributeError: + pass + except sqlite.InterfaceError: + pass + + def CheckValueInTransaction(self): + self.cur.execute("create table test (a)") + self.cur.execute("insert into test (a) values (%s)", "foo") + self.cur.execute("-- types int") + self.cur.execute("select count(a) as count from test") + res = self.cur.fetchone() + self.failUnlessEqual(res.count, 1, + "Wrong number of rows during transaction.") + + def CheckValueAfterCommit(self): + self.cur.execute("create table test (a)") + self.cur.execute("insert into test (a) values (%s)", "foo") + self.cur.execute("-- types int") + self.cur.execute("select count(a) as count from test") + self.cnx.commit() + res = self.cur.fetchone() + self.failUnlessEqual(res.count, 1, + "Wrong number of rows during transaction.") + + def CheckValueAfterRollback(self): + self.cur.execute("create table test (a)") + self.cnx.commit() + self.cur.execute("insert into test (a) values (%s)", "foo") + self.cnx.rollback() + self.cur.execute("-- types int") + self.cur.execute("select count(a) as count from test") + res = self.cur.fetchone() + self.failUnlessEqual(res.count, 0, + "Wrong number of rows during transaction.") + + def CheckImmediateCommit(self): + try: + self.cnx.commit() + except: + self.fail("Immediate commit raises exeption.") + + def CheckImmediateRollback(self): + try: + self.cnx.rollback() + except: + self.fail("Immediate rollback raises exeption.") + +class AutocommitTests(unittest.TestCase, testsupport.TestSupport): + def setUp(self): + self.filename = self.getfilename() + self.cnx = sqlite.connect(self.filename, autocommit=1) + self.cur = self.cnx.cursor() + + def tearDown(self): + try: + self.cnx.close() + self.removefile() + except AttributeError: + pass + except sqlite.InterfaceError: + pass + + def CheckCommit(self): + self.cur.execute("select abs(5)") + try: + self.cnx.commit() + except: + self.fail(".commit() raised an exception") + + def CheckRollback(self): + self.cur.execute("select abs(5)") + self.failUnlessRaises(sqlite.ProgrammingError, self.cnx.rollback) + +class ChangeAutocommitTests(unittest.TestCase): + pass + +def suite(): + transaction_tests = unittest.makeSuite(TransactionTests, "Check") + autocommit_tests = unittest.makeSuite(AutocommitTests, "Check") + change_autocommit_tests = unittest.makeSuite(ChangeAutocommitTests, "Check") + + test_suite = unittest.TestSuite((transaction_tests, autocommit_tests, + change_autocommit_tests)) + return test_suite + +def main(): + runner = unittest.TextTestRunner() + runner.run(suite()) + +if __name__ == "__main__": + main() diff --git a/test/type_tests.py b/test/type_tests.py new file mode 100644 index 0000000..a8d2031 --- /dev/null +++ b/test/type_tests.py @@ -0,0 +1,342 @@ +#!/usr/bin/env python +#-*- coding: ISO-8859-1 -*- +import testsupport +import os, string, sys, types, unittest +import sqlite +import _sqlite + +try: + from mx.DateTime import Date, Time, DateTime, DateTimeDelta, DateFrom, \ + TimeFrom, DateTimeDeltaFrom + have_datetime = 1 +except ImportError: + have_datetime = 0 + +def sqlite_is_at_least(major, minor, micro): + version = map(int, _sqlite.sqlite_version().split(".")) + return version >= (major, minor, micro) + +class MyType: + def __init__(self, val): + self.val = int(val) + + def _quote(self): + return str(self.val) + + def __repr__(self): + return "MyType(%s)" % self.val + + def __cmp__(self, other): + assert(isinstance(other, MyType)) + return cmp(self.val, other.val) + +class MyTypeNew(MyType): + def __quote__(self): + return str(self.val) + + def __getattr__(self, key): + # Forbid access to the old-style _quote method + if key == "_quote": + raise AttributeError + else: + return self.__dict__[key] + +class ExpectedTypes(unittest.TestCase, testsupport.TestSupport): + def setUp(self): + self.filename = self.getfilename() + self.cnx = sqlite.connect(self.filename, converters={"mytype": MyType}) + self.cur = self.cnx.cursor() + + def tearDown(self): + try: + self.cnx.close() + self.removefile() + except AttributeError: + pass + except sqlite.InterfaceError: + pass + + def CheckExpectedTypesStandardTypes(self): + self.cur.execute("create table test (a, b, c)") + self.cur.execute("insert into test(a, b, c) values (5, 6.3, 'hello')") + self.cur.execute("-- types int, float, str") + self.cur.execute("select * from test") + res = self.cur.fetchone() + self.failUnless(isinstance(res.a, types.IntType), + "The built-in int converter didn't work.") + self.failUnless(isinstance(res.b, types.FloatType), + "The built-in float converter didn't work.") + self.failUnless(isinstance(res.c, types.StringType), + "The built-in string converter didn't work.") + + def CheckExpectedTypesStandardTypesNull(self): + self.cur.execute("create table test (a, b, c)") + self.cur.execute("insert into test(a, b, c) values (NULL, NULL, NULL)") + self.cur.execute("-- types int, float, str") + self.cur.execute("select * from test") + res = self.cur.fetchone() + self.failUnless(res.a == None, + "The built-in int converter should have returned None.") + self.failUnless(res.b == None, + "The built-in float converter should have returned None.") + self.failUnless(res.c == None, + "The built-in string converter should have returned None.") + + def CheckExpectedTypesCustomTypes(self): + value = MyType(10) + self.cur.execute("create table test (a)") + self.cur.execute("insert into test(a) values (%s)", value) + self.cur.execute("-- types mytype") + self.cur.execute("select a from test") + res = self.cur.fetchone() + + self.failUnless(isinstance(res.a, MyType), + "The converter did return the wrong type.") + self.failUnlessEqual(value, res.a, + "The returned value and the inserted one are different.") + + def CheckNewQuoteMethod(self): + value = MyTypeNew(10) + self.cur.execute("create table test (a integer)") + self.cur.execute("insert into test(a) values (%s)", value) + self.cur.execute("select a from test") + res = self.cur.fetchone() + + self.failUnlessEqual(10, res.a, + "The returned value and the inserted one are different.") + + def CheckExpectedTypesCustomTypesNull(self): + value = None + self.cur.execute("create table test (a)") + self.cur.execute("insert into test(a) values (%s)", value) + self.cur.execute("-- types mytype") + self.cur.execute("select a from test") + res = self.cur.fetchone() + + self.failUnless(res.a == None, + "The converter should have returned None.") + + def CheckResetExpectedTypes(self): + self.cur.execute("create table test (a)") + self.cur.execute("insert into test(a) values ('5')") + self.cur.execute("-- types int") + self.cur.execute("select a from test") + self.cur.execute("select a from test") + res = self.cur.fetchone() + self.assert_(isinstance(res.a, types.StringType), + "'resetting types' didn't succeed.") + + if have_datetime: + def CheckDateTypes(self): + dt = DateTime(2002, 6, 15) + dtd = DateTimeDelta(0, 0, 0, 1) + + self.cur.execute("create table test (t timestamp)") + self.cur.execute("insert into test(t) values (%s)", (dt,)) + self.cur.execute("select t from test") + res = self.cur.fetchone() + + self.failUnlessEqual(dt, res.t, + "DateTime object should have been %s, was %s" + % (repr(dt), repr(res.t))) + + self.cur.execute("drop table test") + self.cur.execute("create table test(i interval)") + self.cur.execute("insert into test(i) values (%s)", (dtd,)) + self.cur.execute("select i from test") + res = self.cur.fetchone() + + self.failUnlessEqual(dtd, res.i, + "DateTimeDelta object should have been %s, was %s" + % (repr(dtd), repr(res.i))) + +class UnicodeTestsLatin1(unittest.TestCase, testsupport.TestSupport): + def setUp(self): + self.filename = self.getfilename() + self.cnx = sqlite.connect(self.filename, encoding=("iso-8859-1",)) + self.cur = self.cnx.cursor() + + def tearDown(self): + try: + self.cnx.close() + self.removefile() + except AttributeError: + pass + except sqlite.InterfaceError: + pass + + def CheckGetSameBack(self): + test_str = unicode("sterreich", "latin1") + self.cur.execute("create table test (a UNICODE)") + self.cur.execute("insert into test(a) values (%s)", test_str) + self.cur.execute("select a from test") + res = self.cur.fetchone() + self.failUnlessEqual(type(test_str), type(res.a), + "Something other than a Unicode string was fetched: %s" + % (str(type(res.a)))) + self.failUnlessEqual(test_str, res.a, + "Fetching the unicode string doesn't return the inserted one.") + +class UnicodeTestsUtf8(unittest.TestCase, testsupport.TestSupport): + def setUp(self): + self.filename = self.getfilename() + self.cnx = sqlite.connect(self.filename, encoding="utf-8") + self.cur = self.cnx.cursor() + + def tearDown(self): + try: + self.cnx.close() + self.removefile() + except AttributeError: + pass + except sqlite.InterfaceError: + pass + + def CheckGetSameBack(self): + # PREZIDENT ROSSI'SKO' FEDERACII sterreich + test_str = unicode("ПРЕЗИДЕНТ РОССИЙСКОЙ ФЕДЕРАЦИИ Österreich", "utf-8") + + self.cur.execute("create table test (a UNICODE)") + self.cur.execute("insert into test(a) values (%s)", test_str) + self.cur.execute("select a from test") + res = self.cur.fetchone() + self.failUnlessEqual(type(test_str), type(res.a), + "Something other than a Unicode string was fetched: %s" + % (str(type(res.a)))) + self.failUnlessEqual(test_str, res.a, + "Fetching the unicode string doesn't return the inserted one.") + +class UnicodeTestsKOI8R(unittest.TestCase, testsupport.TestSupport): + def setUp(self): + self.filename = self.getfilename() + self.cnx = sqlite.connect(self.filename, encoding="koi8-r") + self.cur = self.cnx.cursor() + + def tearDown(self): + try: + self.cnx.close() + self.removefile() + except AttributeError: + pass + except sqlite.InterfaceError: + pass + + def CheckGetSameBack(self): + # PREZIDENT ROSSI'SKO' FEDERACII + # (President of the Russian Federation) + test_str = unicode(" ", "koi8-r") + + self.cur.execute("create table test (a UNICODE)") + self.cur.execute("insert into test(a) values (%s)", test_str) + self.cur.execute("select a from test") + res = self.cur.fetchone() + self.failUnlessEqual(type(test_str), type(res.a), + "Something other than a Unicode string was fetched: %s" + % (str(type(res.a)))) + self.failUnlessEqual(test_str, res.a, + "Fetching the unicode string doesn't return the inserted one.") + +class SQLiteBuiltinTypeSupport(unittest.TestCase, testsupport.TestSupport): + def setUp(self): + self.filename = self.getfilename() + self.cnx = sqlite.connect(self.filename, encoding="koi8-r") + self.cur = self.cnx.cursor() + + def tearDown(self): + try: + self.cnx.close() + self.removefile() + except AttributeError: + pass + except sqlite.InterfaceError: + pass + + def CheckInt(self): + self.cur.execute("create table test (a INTEGER)") + self.cur.execute("insert into test(a) values (%s)", 5) + self.cur.execute("select a from test") + res = self.cur.fetchone() + self.failUnlessEqual(type(5), type(res.a), + "Something other than an INTEGER was fetched: %s" + % (str(type(res.a)))) + + def CheckFloat(self): + self.cur.execute("create table test (a FLOAT)") + self.cur.execute("insert into test(a) values (%s)", 5.7) + self.cur.execute("select a from test") + res = self.cur.fetchone() + self.failUnlessEqual(type(5.7), type(res.a), + "Something other than a FLOAT was fetched: %s" + % (str(type(res.a)))) + + def CheckString(self): + self.cur.execute("create table test (a VARCHAR(20))") + self.cur.execute("insert into test(a) values (%s)", "foo") + self.cur.execute("select a from test") + res = self.cur.fetchone() + self.failUnlessEqual(type("foo"), type(res.a), + "Something other than a VARCHAR was fetched: %s" + % (str(type(res.a)))) + + def CheckBinary(self): + bindata = "".join([chr(x) for x in range(256)]) + self.cur.execute("create table test(b BINARY)") + self.cur.execute("insert into test(b) values (%s)", sqlite.Binary(bindata)) + self.cur.execute("select b from test") + res = self.cur.fetchone() + self.failUnlessEqual(bindata, res.b, "Binary roundtrip didn't produce original string") + self.failUnlessEqual(self.cur.description[0][1], sqlite.BINARY, "Wrong type code") + + if have_datetime: + def CheckDate(self): + self.cur.execute("create table test (a DATE)") + d = DateFrom("2002-05-07") + self.cur.execute("insert into test(a) values (%s)", d) + self.cur.execute("select a from test") + res = self.cur.fetchone() + if res.a != d: + self.fail("didn't get back the same DATE") + + def CheckTime(self): + self.cur.execute("create table test (a TIME)") + t = TimeFrom("22:15:00") + self.cur.execute("insert into test(a) values (%s)", t) + self.cur.execute("select a from test") + res = self.cur.fetchone() + if res.a != t: + self.fail("didn't get back the same TIME") + + def CheckTimestamp(self): + self.cur.execute("create table test (a TIMESTAMP)") + d = DateFrom("2002-05-07 22:15:00") + self.cur.execute("insert into test(a) values (%s)", d) + self.cur.execute("select a from test") + res = self.cur.fetchone() + if res.a != d: + self.fail("didn't get back the same TIMESTAMP") + + def CheckInterval(self): + self.cur.execute("create table test (a INTERVAL)") + d = DateTimeDeltaFrom("02:00:00") + self.cur.execute("insert into test(a) values (%s)", d) + self.cur.execute("select a from test") + res = self.cur.fetchone() + if res.a != d: + self.fail("didn't get back the same INTERVAL") + +def suite(): + expected_suite = unittest.makeSuite(ExpectedTypes, "Check") + unicode_suite1 = unittest.makeSuite(UnicodeTestsLatin1, "Check") + unicode_suite2 = unittest.makeSuite(UnicodeTestsUtf8, "Check") + unicode_suite3 = unittest.makeSuite(UnicodeTestsKOI8R, "Check") + builtin_suite = unittest.makeSuite(SQLiteBuiltinTypeSupport, "Check") + + return unittest.TestSuite((expected_suite, unicode_suite1, unicode_suite2, + unicode_suite3, builtin_suite)) + +def main(): + runner = unittest.TextTestRunner() + runner.run(suite()) + +if __name__ == "__main__": + main() diff --git a/test/userfunction_tests.py b/test/userfunction_tests.py new file mode 100644 index 0000000..ce70387 --- /dev/null +++ b/test/userfunction_tests.py @@ -0,0 +1,259 @@ +#!/usr/bin/env python +import testsupport +import os, string, sys, types, unittest +import sqlite + +def intreturner(x): + return int(x) * 2 + +def floatreturner(x): + return float(x) * 2.0 + +def stringreturner(x): + return "[%s]" % x + +def nullreturner(x): + return None + +def exceptionreturner(x): + return 5 / 0 + +class MySum: + def __init__(self): + self.reset() + + def reset(self): + self.sum = 0 + + def step(self, x): + self.sum += int(x) + + def finalize(self): + val = self.sum + self.reset() + return val + +class MySumFloat: + def __init__(self): + self.reset() + + def reset(self): + self.sum = 0.0 + + def step(self, x): + self.sum += float(x) + + def finalize(self): + val = self.sum + self.reset() + return val + +class MySumReturnNull: + def __init__(self): + self.reset() + + def reset(self): + self.sum = 0 + + def step(self, x): + self.sum += int(x) + + def finalize(self): + return None + +class MySumStepExeption: + def __init__(self): + self.reset() + + def reset(self): + self.sum = 0 + + def step(self, x): + self.sum += int(x) / 0 + + def finalize(self): + val = self.sum + self.reset() + return val + +class MySumFinalizeExeption: + def __init__(self): + self.reset() + + def reset(self): + self.sum = 0 + + def step(self, x): + self.sum += int(x) + + def finalize(self): + val = self.sum / 0 + self.reset() + return val + +class UserFunctions(unittest.TestCase, testsupport.TestSupport): + def setUp(self): + self.filename = self.getfilename() + self.cnx = sqlite.connect(self.filename) + + sqlite._sqlite.enable_callback_debugging(0) + + self.cnx.create_function("intreturner", 1, intreturner) + self.cnx.create_function("floatreturner", 1, floatreturner) + self.cnx.create_function("stringreturner", 1, stringreturner) + self.cnx.create_function("nullreturner", 1, nullreturner) + self.cnx.create_function("exceptionreturner", 1, exceptionreturner) + + self.cnx.create_aggregate("mysum", 1, MySum) + self.cnx.create_aggregate("mysumfloat", 1, MySumFloat) + self.cnx.create_aggregate("mysumreturnnull", 1, MySumReturnNull ) + self.cnx.create_aggregate("mysumstepexception", 1, MySumStepExeption) + self.cnx.create_aggregate("mysumfinalizeexception", 1, MySumFinalizeExeption) + self.cur = self.cnx.cursor() + + def tearDown(self): + try: + self.cnx.close() + self.removefile() + except AttributeError: + pass + except sqlite.InterfaceError: + pass + + def CheckIntFunction(self): + self.cur.execute("create table test (a)") + self.cur.execute("insert into test(a) values (%s)", 5) + self.cur.execute("-- types int") + self.cur.execute("select intreturner(a) as a from test") + res = self.cur.fetchone() + self.failUnless(isinstance(res.a, types.IntType), + "The result should have been an int.") + self.failUnlessEqual(res.a, 10, + "The function returned the wrong result.") + + def CheckFloatFunction(self): + self.cur.execute("create table test (a)") + self.cur.execute("insert into test(a) values (%s)", 5.0) + self.cur.execute("-- types float") + self.cur.execute("select floatreturner(a) as a from test") + res = self.cur.fetchone() + self.failUnless(isinstance(res.a, types.FloatType), + "The result should have been a float.") + self.failUnlessEqual(res.a, 5.0 * 2.0, + "The function returned the wrong result.") + + def CheckStringFunction(self): + mystr = "test" + self.cur.execute("create table test (a)") + self.cur.execute("insert into test(a) values (%s)", mystr) + self.cur.execute("-- types str") + self.cur.execute("select stringreturner(a) as a from test") + res = self.cur.fetchone() + self.failUnless(isinstance(res.a, types.StringType), + "The result should have been a string.") + self.failUnlessEqual(res.a, "[%s]" % mystr, + "The function returned the wrong result.") + + def CheckNullFunction(self): + mystr = "test" + self.cur.execute("create table test (a)") + self.cur.execute("insert into test(a) values (%s)", mystr) + self.cur.execute("-- types str") + self.cur.execute("select nullreturner(a) as a from test") + res = self.cur.fetchone() + self.failUnlessEqual(res.a, None, + "The result should have been None.") + + def CheckFunctionWithNullArgument(self): + mystr = "test" + self.cur.execute("-- types str") + self.cur.execute("select nullreturner(NULL) as a") + res = self.cur.fetchone() + self.failUnlessEqual(res.a, None, + "The result should have been None.") + + + def CheckExceptionFunction(self): + mystr = "test" + self.cur.execute("create table test (a)") + self.cur.execute("insert into test(a) values (%s)", mystr) + self.cur.execute("-- types str") + try: + self.cur.execute("select exceptionreturner(a) as a from test") + except sqlite.DatabaseError, reason: + pass + except Exception, reason: + self.fail("Wrong exception raised: %s", sys.exc_info()[0]) + + def CheckAggregateBasic(self): + self.cur.execute("create table test (a)") + self.cur.executemany("insert into test(a) values (%s)", [(10,), (20,), (30,)]) + self.cur.execute("-- types int") + self.cur.execute("select mysum(a) as sum from test") + res = self.cur.fetchone() + self.failUnless(isinstance(res.sum, types.IntType), + "The result should have been an int.") + self.failUnlessEqual(res.sum, 60, + "The function returned the wrong result.") + + def CheckAggregateFloat(self): + self.cur.execute("create table test (a)") + self.cur.executemany("insert into test(a) values (%s)", [(10.0,), (20.0,), (30.0,)]) + self.cur.execute("-- types float") + self.cur.execute("select mysumfloat(a) as sum from test") + res = self.cur.fetchone() + self.failUnless(isinstance(res.sum, types.FloatType), + "The result should have been an float.") + if res.sum <= 59.9 or res.sum >= 60.1: + self.fail("The function returned the wrong result.") + + def CheckAggregateReturnNull(self): + self.cur.execute("create table test (a)") + self.cur.executemany("insert into test(a) values (%s)", [(10,), (20,), (30,)]) + self.cur.execute("-- types int") + self.cur.execute("select mysumreturnnull(a) as sum from test") + res = self.cur.fetchone() + self.failUnlessEqual(res.sum, None, + "The result should have been None.") + + def CheckAggregateStepException(self): + self.cur.execute("create table test (a)") + self.cur.executemany("insert into test(a) values (%s)", [(10,), (20,), (30,)]) + self.cur.execute("-- types int") + try: + self.cur.execute("select mysumstepexception(a) as sum from test") + except sqlite.DatabaseError, reason: + pass + except Exception, reason: + self.fail("Wrong exception raised: %s" % sys.exc_info()[0]) + + def CheckAggregateFinalizeException(self): + self.cur.execute("create table test (a)") + self.cur.executemany("insert into test(a) values (%s)", [(10,), (20,), (30,)]) + self.cur.execute("-- types int") + try: + self.cur.execute("select mysumfinalizeexception(a) as sum from test") + except sqlite.DatabaseError, reason: + pass + except Exception, reason: + self.fail("Wrong exception raised: %s", sys.exc_info()[0]) + + def CheckAggregateStepNullArgument(self): + self.cur.execute("-- types int") + self.cur.execute("select mysum(NULL) as a") + res = self.cur.fetchone() + self.failUnlessEqual(res.a, 0, + "The result should have been 0.") + + +def suite(): + user_functions = unittest.makeSuite(UserFunctions, "Check") + test_suite = unittest.TestSuite((user_functions,)) + return test_suite + +def main(): + runner = unittest.TextTestRunner() + runner.run(suite()) + +if __name__ == "__main__": + main() -- cgit v1.2.3