summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorMicah Anderson <micah@riseup.net>2013-11-17 17:25:01 -0500
committerMicah Anderson <micah@riseup.net>2013-11-17 17:25:01 -0500
commit79d16d72cd530acbee682ebee44d5b1d2010c661 (patch)
treed04b84f51e5023cc7382ec91674e2967ffb533c7 /test
initial import of debian package to build with autobuilderupstream
Diffstat (limited to 'test')
-rw-r--r--test/all_tests.py23
-rw-r--r--test/api_tests.py524
-rw-r--r--test/logging_tests.py80
-rw-r--r--test/lowlevel_tests.py162
-rw-r--r--test/pgresultset_tests.py202
-rw-r--r--test/testsupport.py14
-rw-r--r--test/transaction_tests.py106
-rw-r--r--test/type_tests.py342
-rw-r--r--test/userfunction_tests.py259
9 files changed, 1712 insertions, 0 deletions
diff --git a/test/all_tests.py b/test/all_tests.py
new file mode 100644
index 0000000..df1a03d
--- /dev/null
+++ b/test/all_tests.py
@@ -0,0 +1,23 @@
+#!/usr/bin/env python
+"""
+This combines all PySQLite test suites into one big one.
+"""
+
+import unittest, sys
+import api_tests, logging_tests, lowlevel_tests, pgresultset_tests, type_tests
+import userfunction_tests, transaction_tests
+
+def suite():
+ suite = unittest.TestSuite((lowlevel_tests.suite(), api_tests.suite(),
+ type_tests.suite(), userfunction_tests.suite(),
+ transaction_tests.suite(), pgresultset_tests.suite(),
+ logging_tests.suite()))
+
+ return suite
+
+def main():
+ runner = unittest.TextTestRunner()
+ runner.run(suite())
+
+if __name__ == "__main__":
+ main()
diff --git a/test/api_tests.py b/test/api_tests.py
new file mode 100644
index 0000000..78f7c7f
--- /dev/null
+++ b/test/api_tests.py
@@ -0,0 +1,524 @@
+#!/usr/bin/env python
+import testsupport
+import os, string, sys, types, unittest, weakref
+import sqlite
+
+class DBAPICompliance(unittest.TestCase):
+ def CheckAPILevel(self):
+ self.assertEqual(sqlite.apilevel, '2.0',
+ 'apilevel is %s, should be 2.0' % sqlite.apilevel)
+
+ def CheckThreadSafety(self):
+ self.assertEqual(sqlite.threadsafety, 1,
+ 'threadsafety is %d, should be 1' % sqlite.threadsafety)
+
+ def CheckParamStyle(self):
+ self.assertEqual(sqlite.paramstyle, 'pyformat',
+ 'paramstyle is "%s", should be "pyformat"' %
+ sqlite.paramstyle)
+
+ def CheckWarning(self):
+ self.assert_(issubclass(sqlite.Warning, StandardError),
+ 'Warning is not a subclass of StandardError')
+
+ def CheckError(self):
+ self.failUnless(issubclass(sqlite.Error, StandardError),
+ 'Error is not a subclass of StandardError')
+
+ def CheckInterfaceError(self):
+ self.failUnless(issubclass(sqlite.InterfaceError, sqlite.Error),
+ 'InterfaceError is not a subclass of Error')
+
+ def CheckDatabaseError(self):
+ self.failUnless(issubclass(sqlite.DatabaseError, sqlite.Error),
+ 'DatabaseError is not a subclass of Error')
+
+ def CheckDataError(self):
+ self.failUnless(issubclass(sqlite.DataError, sqlite.DatabaseError),
+ 'DataError is not a subclass of DatabaseError')
+
+ def CheckOperationalError(self):
+ self.failUnless(issubclass(sqlite.OperationalError, sqlite.DatabaseError),
+ 'OperationalError is not a subclass of DatabaseError')
+
+ def CheckIntegrityError(self):
+ self.failUnless(issubclass(sqlite.IntegrityError, sqlite.DatabaseError),
+ 'IntegrityError is not a subclass of DatabaseError')
+
+ def CheckInternalError(self):
+ self.failUnless(issubclass(sqlite.InternalError, sqlite.DatabaseError),
+ 'InternalError is not a subclass of DatabaseError')
+
+ def CheckProgrammingError(self):
+ self.failUnless(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError),
+ 'ProgrammingError is not a subclass of DatabaseError')
+
+ def CheckNotSupportedError(self):
+ self.failUnless(issubclass(sqlite.NotSupportedError,
+ sqlite.DatabaseError),
+ 'NotSupportedError is not a subclass of DatabaseError')
+
+class moduleTestCases(unittest.TestCase, testsupport.TestSupport):
+ def setUp(self):
+ self.filename = self.getfilename()
+ self.cnx = sqlite.connect(self.filename)
+ self.cur = self.cnx.cursor()
+
+ def tearDown(self):
+ try:
+ self.cnx.close()
+ self.removefile()
+ except AttributeError:
+ pass
+ except sqlite.ProgrammingError:
+ pass
+
+ def CheckConnectionObject(self):
+ self.assert_(isinstance(self.cnx, sqlite.Connection),
+ 'sqlite.connect did not return a Connection object')
+
+ def CheckConnectionClose(self):
+ self.assert_(hasattr(self.cnx, 'close') and
+ type(self.cnx.close) == types.MethodType,
+ 'close is not a method of Connection')
+ self.cnx.close()
+ self.removefile()
+ self.failUnlessRaises(sqlite.ProgrammingError, self.cnx.close)
+
+ def CheckConnectionCommit(self):
+ self.assert_(hasattr(self.cnx, "commit") and
+ type(self.cnx.commit) == types.MethodType,
+ 'commit is not a method of Connection')
+ self.cnx.close()
+ self.removefile()
+ self.failUnlessRaises(sqlite.ProgrammingError, self.cnx.commit)
+
+ def CheckConnectionRollback(self):
+ self.assert_(hasattr(self.cnx, "rollback") and
+ type(self.cnx.rollback) == types.MethodType,
+ 'rollback is not a method of Connection')
+ self.cnx.close()
+ self.removefile()
+ self.failUnlessRaises(sqlite.ProgrammingError, self.cnx.rollback)
+
+ def CheckConnectionCursor(self):
+ self.assert_(hasattr(self.cnx, "cursor") and
+ type(self.cnx.cursor) == types.MethodType,
+ 'cursor is not a method of Connection')
+ self.cnx.close()
+ self.removefile()
+ self.failUnlessRaises(sqlite.ProgrammingError, self.cnx.cursor)
+
+ def CheckCloseConnection(self):
+ self.cnx.close()
+ self.removefile()
+
+ def CheckCursorObject(self):
+ self.assert_(isinstance(self.cur, sqlite.Cursor),
+ 'cnx.cursor() did not return a Cursor instance')
+
+ def CheckCursorArraysize(self):
+ self.assert_(self.cur.arraysize == 1,
+ 'cur.arraysize is %d, it should be 1' %
+ self.cur.arraysize)
+
+ def CheckCursorDescription(self):
+ self.assert_(self.cur.description == None,
+ "cur.description should be None at this point, it isn't.")
+
+ def CheckCursorDescriptionNoRow(self):
+ """
+ cursor.description should at least provide the column name(s), even if
+ no row returned.
+ """
+ self.cur.execute("create table test(a, b)")
+ self.cur.execute("select a, b from test")
+ self.assert_(self.cur.description[0][0] == "a")
+ self.assert_(self.cur.description[1][0] == "b")
+
+ def CheckCursorRowcount(self):
+ self.assert_(self.cur.rowcount == -1,
+ 'cur.rowcount is %d, should be -1' % self.cur.rowcount)
+
+ def CheckCursorClose(self):
+ self.assert_(hasattr(self.cur, "close") and
+ type(self.cur.close) == types.MethodType,
+ 'close is not a method of the Cursor object')
+ self.cur.close()
+ self.failUnlessRaises(sqlite.ProgrammingError, self.cur.close)
+
+ def CheckCursorExecute(self):
+ self.assert_(hasattr(self.cur, "execute") and
+ type(self.cur.execute) == types.MethodType,
+ 'execute is not a method of the Cursor object')
+ self.cur.close()
+ self.failUnlessRaises(sqlite.ProgrammingError,
+ self.cur.execute, 'SELECT max(3,4)')
+
+ def CheckCursorExecutemany(self):
+ self.assert_(hasattr(self.cur, "executemany") and
+ type(self.cur.executemany) == types.MethodType,
+ 'executemany is not a method of the Cursor object')
+
+ self.cur.close()
+ self.failUnlessRaises(sqlite.ProgrammingError,
+ self.cur.executemany, 'SELECT max(3,4)', [1,2])
+
+ def CheckCursorFetchone(self):
+ self.assert_(hasattr(self.cur, "fetchone") and
+ type(self.cur.fetchone) == types.MethodType,
+ 'fetchone is not a method of the Cursor object')
+ self.cur.close()
+ self.failUnlessRaises(sqlite.ProgrammingError, self.cur.fetchone)
+
+ def CheckCursorFetchMany(self):
+ self.failUnless(hasattr(self.cur, "fetchmany") and
+ type(self.cur.fetchmany) == types.MethodType,
+ 'fetchmany is not a method of the Cursor object')
+
+ cursor = self.cnx.cursor()
+ cursor.execute("create table test(id int)")
+ cursor.executemany("insert into test(id) values (%s)", range(10))
+ cursor.execute("select id from test")
+ res = cursor.fetchmany()
+ self.failUnlessEqual(len(res), 1, """fetchmany should have returned a
+ list of length 1, but the list was %i elements long""" % len(res))
+ res = cursor.fetchmany(2)
+ self.failUnlessEqual(len(res), 2, """fetchmany should have returned a
+ list of length 2, but the list was %i elements long""" % len(res))
+ cursor.arraysize = 5
+ res = cursor.fetchmany()
+ self.failUnlessEqual(len(res), 5, """fetchmany should have returned a
+ list of length 5, but the list was %i elements long""" % len(res))
+
+ self.cur.close()
+ self.failUnlessRaises(sqlite.ProgrammingError,
+ self.cur.fetchmany, 10)
+
+ def CheckCursorFetchall(self):
+ self.failUnless(hasattr(self.cur, "fetchall") and
+ type(self.cur.fetchall) == types.MethodType,
+ 'fetchall is not a method of the Cursor object')
+ self.cur.close()
+ self.failUnlessRaises(sqlite.ProgrammingError,
+ self.cur.fetchall)
+
+ def CheckCursorSetoutputsize(self):
+ self.failUnless(hasattr(self.cur, "setoutputsize") and
+ type(self.cur.setoutputsize) == types.MethodType,
+ 'setoutputsize is not a method of the Cursor object')
+ self.cur.close()
+ self.failUnlessRaises(sqlite.ProgrammingError,
+ self.cur.setoutputsize, 1024)
+
+ def CheckCursorSetinputsizes(self):
+ self.failUnless(hasattr(self.cur, "setinputsizes") and
+ type(self.cur.setinputsizes) == types.MethodType,
+ 'setinputsizes is not a method of the Cursor object')
+ self.cur.close()
+ self.failUnlessRaises(sqlite.ProgrammingError,
+ self.cur.setinputsizes, [1, 2, 3])
+
+ def CheckExecuteWithSingleton(self):
+ """Test execute() with a singleton string as the parameter."""
+ try:
+ self.cur.execute("select max(3,4)")
+ except StandardError, msg:
+ self.fail(msg)
+
+ self.assertEqual(type(self.cur.description), types.TupleType,
+ "cur.description should be a tuple, but isn't.")
+
+ clen = len(self.cur.description)
+ self.assertEqual(clen, 1,
+ "Length of cur.description is %d, it should be %d." %
+ (clen, 1))
+
+
+ self.assertEqual(len(self.cur.description[0]), 7,
+ "Length of cur.description[0] is %d, it should be 7." %
+ len(self.cur.description[0]))
+
+
+ self.failUnless(self.cur.description[0][0] == "max(3,4)" and
+ self.cur.description[0][1] == sqlite.NUMBER and
+ self.cur.description[0][2] == None and
+ self.cur.description[0][3] == None and
+ self.cur.description[0][4] == None and
+ self.cur.description[0][5] == None and
+ self.cur.description[0][6] == None,
+ "cur.description[0] does not match the query.")
+ self.cur.close()
+
+ def CheckExecuteWithTuple(self):
+ """Test execute() with a tuple as the parameter."""
+ try:
+ self.cur.execute("select max(%s, %s)", (4, 5))
+ except StandardError, msg:
+ self.fail(msg)
+
+ # Empty tuple
+ try:
+ self.cur.execute("select 3+4", ())
+ except StandardError, msg:
+ self.fail(msg)
+ self.cur.close()
+
+ def CheckExecuteWithDictionary(self):
+ """Test execute() with a dictionary as the parameter."""
+ try:
+ self.cur.execute("select max(%(n1)s, %(n2)s)", {"n1": 5, "n2": 6})
+ except StandardError, msg:
+ self.fail(msg)
+ self.cur.close()
+
+ def CheckQuotingOfLong(self):
+ """Test wether longs are quoted properly for SQL."""
+ try:
+ self.cur.execute("-- types long")
+ self.cur.execute("select %s + %s as x", (5L, 6L))
+ except StandardError, msg:
+ self.fail(msg)
+ res = self.cur.fetchone()
+ self.failUnlessEqual(res.x, 11L,
+ "The addition of long should have returned %i, returned %i"
+ % (11L, res.x))
+
+ def CheckCursorIterator(self):
+ self.cur.execute("create table test (id, name)")
+ self.cur.executemany("insert into test (id) values (%s)",
+ [(1,), (2,), (3,)])
+ self.cur.execute("-- types int")
+ self.cur.execute("select id from test")
+
+ if sys.version_info[:2] >= (2,2):
+ counter = 0
+ for row in self.cur:
+ if counter == 0:
+ self.failUnlessEqual(row.id, 1,
+ "row.id should have been 1, was %i" % row.id)
+ elif counter == 1:
+ self.failUnlessEqual(row.id, 2,
+ "row.id should have been 2, was %i" % row.id)
+ elif counter == 2:
+ self.failUnlessEqual(row.id, 3,
+ "row.id should have been 3, was %i" % row.id)
+ else:
+ self.fail("Iterated over too many rows.")
+ counter += 1
+ else:
+ # Python 2.1
+ counter = 0
+ try:
+ while 1:
+ row = self.cur.next()
+ if counter == 0:
+ self.failUnlessEqual(row.id, 1,
+ "row.id should have been 1, was %i" % row.id)
+ elif counter == 1:
+ self.failUnlessEqual(row.id, 2,
+ "row.id should have been 2, was %i" % row.id)
+ elif counter == 2:
+ self.failUnlessEqual(row.id, 3,
+ "row.id should have been 3, was %i" % row.id)
+ else:
+ self.fail("Iterated over too many rows.")
+ counter += 1
+ except IndexError:
+ pass
+ self.failUnlessEqual(counter, 3,
+ "Should have iterated over 3 items, was: %i" % counter)
+
+ def CheckCursorScrollAndRownumber(self):
+ self.cur.execute("create table test (id, name)")
+ values = [("foo",)] * 20
+ self.cur.executemany("insert into test (name) values (%s)", values)
+ self.cur.execute("select name from test")
+ self.failUnlessEqual(self.cur.rownumber, 0,
+ "Directly after execute, rownumber must be 0, is: %i"
+ % self.cur.rownumber)
+
+ self.cur.scroll(1, "absolute")
+ self.cur.scroll(5, "absolute")
+ self.failUnlessEqual(self.cur.rownumber, 5,
+ "rownumber should be 5, is: %i"
+ % self.cur.rownumber)
+
+ self.cur.scroll(1, "relative")
+ self.failUnlessEqual(self.cur.rownumber, 6,
+ "rownumber should be 6, is: %i"
+ % self.cur.rownumber)
+
+ self.cur.scroll(-2, "relative")
+ self.failUnlessEqual(self.cur.rownumber, 4,
+ "rownumber should be 4, is: %i"
+ % self.cur.rownumber)
+
+ self.failUnlessRaises(IndexError, self.cur.scroll, -2, "absolute")
+ self.failUnlessRaises(IndexError, self.cur.scroll, 1000, "absolute")
+
+ self.cur.scroll(10, "absolute")
+ self.failUnlessRaises(IndexError, self.cur.scroll, -11, "relative")
+
+ self.cur.scroll(10, "absolute")
+ self.failUnlessRaises(IndexError, self.cur.scroll, 30, "relative")
+
+ def CheckCursorConnection(self):
+ if not isinstance(self.cur.connection, weakref.ProxyType) and \
+ not isinstance(self.cur.connection, weakref.CallableProxyType):
+ fail("cursor.connection doesn't return the correct type")
+
+ def CheckCursorLastRowID(self):
+ self.cur.execute("create table test (id integer primary key, name)")
+
+ self.cur.execute("insert into test(name) values ('foo')")
+ self.failUnlessEqual(self.cur.lastrowid, 1,
+ "lastrowid should be 1, is %i" % self.cur.lastrowid)
+
+ self.cur.execute("insert into test(name) values ('foo')")
+ self.failUnlessEqual(self.cur.lastrowid, 2,
+ "lastrowid should be 2, is %i" % self.cur.lastrowid)
+
+ def CheckResultObject(self):
+ try:
+ self.cur.execute("select max(3,4)")
+ self.assertEqual(self.cur.rowcount, 1,
+ "cur.rowcount is %d, it should be 1." %
+ self.cur.rowcount)
+ self.res = self.cur.fetchall()
+ except StandardError, msg:
+ self.fail(msg)
+
+ self.assertEqual(type(self.res), types.ListType,
+ 'cur.fetchall() did not return a sequence.')
+
+ self.assertEqual(len(self.res), 1,
+ 'Length of the list of results is %d, it should be 1' %
+ len(self.res))
+
+ self.failUnless(isinstance(self.res[0], sqlite.PgResultSet),
+ 'cur.fetchall() did not return a list of PgResultSets.')
+
+ def CheckResultFetchone(self):
+ try:
+ self.cur.execute("select max(3,4)")
+ self.res = self.cur.fetchone()
+ self.assertEqual(self.cur.rowcount, 1,
+ 'cur.rowcount is %d, it should be 1.' %
+ self.cur.rowcount)
+ except StandardError, msg:
+ self.fail(msg)
+
+ self.failUnless(isinstance(self.res, sqlite.PgResultSet),
+ "cur.fetchone() does not return a PgResultSet.")
+
+ try:
+ self.res = self.cur.fetchone()
+ self.assertEqual(self.res, None,
+ "res should be None at this point, but it isn't.")
+ except StandardError, msg:
+ self.fail(msg)
+
+ def CheckRowCountAfterInsert(self):
+ try:
+ self.cur.execute("create table test(a)")
+ self.cur.execute("insert into test(a) values (5)")
+ self.assertEqual(self.cur.rowcount, 1,
+ 'cur.rowcount is %d, it should be 1.' %
+ self.cur.rowcount)
+ except StandardError, msg:
+ self.fail(msg)
+
+ def CheckRowCountAfterUpdate(self):
+ try:
+ self.cur.execute("create table test(a, b)")
+ self.cur.execute("insert into test(a, b) values (1, 2)")
+ self.cur.execute("insert into test(a, b) values (1, 3)")
+ self.cur.execute("insert into test(a, b) values (1, 4)")
+ self.cur.execute("update test set b=1 where a=1")
+ self.assertEqual(self.cur.rowcount, 3,
+ 'cur.rowcount is %d, it should be 3.' %
+ self.cur.rowcount)
+ except StandardError, msg:
+ self.fail(msg)
+
+ def CheckRowCountAfterDelete(self):
+ try:
+ self.cur.execute("create table test(a, b)")
+ self.cur.execute("insert into test(a, b) values (1, 2)")
+ self.cur.execute("insert into test(a, b) values (1, 3)")
+ self.cur.execute("insert into test(a, b) values (2, 4)")
+ self.cur.execute("delete from test where a=1")
+ self.assertEqual(self.cur.rowcount, 2,
+ 'cur.rowcount is %d, it should be 2.' %
+ self.cur.rowcount)
+ except StandardError, msg:
+ self.fail(msg)
+
+ def CheckSelectOfNonPrintableString(self):
+ try:
+ a = '\x01\x02\x03\x04'
+ self.cur.execute('select %s as a', a)
+ r = self.cur.fetchone()
+ self.assertEqual(len(r.a), len(a),
+ "Length of result is %d, it should be %d." %
+ (len(r.a), len(a)))
+ self.failUnless(r.a == a,
+ "Result is '%s', it should be '%s'" % (r.a, a))
+ except StandardError, msg:
+ self.fail(msg)
+
+ def CheckQuotingIntWithPercentS(self):
+ try:
+ self.cur.execute("create table test(a number)")
+ self.cur.execute("insert into test(a) values (%s)", (5,))
+ except StandardError, msg:
+ self.fail(msg)
+
+ def CheckQuotingLongWithPercentS(self):
+ try:
+ self.cur.execute("create table test(a number)")
+ self.cur.execute("insert into test(a) values (%s)", (50000000L,))
+ except StandardError, msg:
+ self.fail(msg)
+
+ def CheckQuotingFloatWithPercentS(self):
+ try:
+ self.cur.execute("create table test(a number)")
+ self.cur.execute("insert into test(a) values (%s)", (-3.24,))
+ except StandardError, msg:
+ self.fail(msg)
+
+ def CheckQuotingIntWithPyQuoting(self):
+ try:
+ self.cur.execute("create table test(a number)")
+ self.cur.execute("insert into test(a) values (%i)", (5,))
+ except StandardError, msg:
+ self.fail(msg)
+
+ def CheckQuotingLongWithPyQuoting(self):
+ try:
+ self.cur.execute("create table test(a number)")
+ self.cur.execute("insert into test(a) values (%i)", (50000000L,))
+ except StandardError, msg:
+ self.fail(msg)
+
+ def CheckQuotingFloatWithPyQuoting(self):
+ try:
+ self.cur.execute("create table test(a number)")
+ self.cur.execute("insert into test(a) values (%f)", (-3.24,))
+ except StandardError, msg:
+ self.fail(msg)
+
+def suite():
+ dbapi_suite = unittest.makeSuite(DBAPICompliance, "Check")
+ module_suite = unittest.makeSuite(moduleTestCases, "Check")
+ test_suite = unittest.TestSuite((dbapi_suite, module_suite))
+ return test_suite
+
+def main():
+ runner = unittest.TextTestRunner()
+ runner.run(suite())
+
+if __name__ == "__main__":
+ main()
diff --git a/test/logging_tests.py b/test/logging_tests.py
new file mode 100644
index 0000000..ed7bf24
--- /dev/null
+++ b/test/logging_tests.py
@@ -0,0 +1,80 @@
+#!/usr/bin/env python
+import testsupport
+import StringIO, unittest
+import sqlite
+
+class LogFileTemplate:
+ def write(self, s):
+ pass
+
+class LogFile:
+ def __init__(self):
+ pass
+
+def init_LogFile():
+ LogFile.write = LogFileTemplate.write
+
+class CommandLoggingTests(unittest.TestCase, testsupport.TestSupport):
+ def tearDown(self):
+ try:
+ self.cnx.close()
+ self.removefile()
+ except AttributeError:
+ pass
+ except sqlite.InterfaceError:
+ pass
+
+ def CheckNoWrite(self):
+ init_LogFile()
+ del LogFile.write
+ logger = LogFile()
+ try:
+ self.cnx = sqlite.connect(self.getfilename(),
+ command_logfile=logger)
+
+ self.fail("ValueError not raised")
+ except ValueError:
+ pass
+
+ def CheckWriteNotCallable(self):
+ logger = LogFile()
+ logger.write = 5
+ try:
+ self.cnx = sqlite.connect(self.getfilename(),
+ command_logfile=logger)
+
+ self.fail("ValueError not raised")
+ except ValueError:
+ pass
+
+ def CheckLoggingWorks(self):
+ logger = StringIO.StringIO()
+
+ expected_output = "\n".join([
+ "BEGIN", "CREATE TABLE TEST(FOO INTEGER)",
+ "INSERT INTO TEST(FOO) VALUES (5)",
+ "ROLLBACK"]) + "\n"
+
+ self.cnx = sqlite.connect(self.getfilename(),
+ command_logfile=logger)
+ cu = self.cnx.cursor()
+ cu.execute("CREATE TABLE TEST(FOO INTEGER)")
+ cu.execute("INSERT INTO TEST(FOO) VALUES (%i)", (5,))
+ self.cnx.rollback()
+
+ logger.seek(0)
+ real_output = logger.read()
+
+ if expected_output != real_output:
+ self.fail("Logging didn't produce expected output.")
+
+def suite():
+ command_logging_suite = unittest.makeSuite(CommandLoggingTests, "Check")
+ return command_logging_suite
+
+def main():
+ runner = unittest.TextTestRunner()
+ runner.run(suite())
+
+if __name__ == "__main__":
+ main()
diff --git a/test/lowlevel_tests.py b/test/lowlevel_tests.py
new file mode 100644
index 0000000..7a1d537
--- /dev/null
+++ b/test/lowlevel_tests.py
@@ -0,0 +1,162 @@
+#!/usr/bin/env python
+"""
+These are the tests for the low-level module _sqlite.
+
+They try to execute as much of the low-level _sqlite module as possible to
+facilitate coverage testing with the help of gcov.
+"""
+
+from __future__ import nested_scopes
+import testsupport
+import os, unittest, re
+import _sqlite
+from sqlite import ProgrammingError
+
+class lowlevelTestCases(unittest.TestCase, testsupport.TestSupport):
+ def setUp(self):
+ self.filename = self.getfilename()
+ self.cnx = _sqlite.connect(self.filename)
+
+ def tearDown(self):
+ try:
+ self.cnx.close()
+ self.removefile()
+ del self.cnx
+ except AttributeError:
+ pass
+ except ProgrammingError:
+ pass
+
+ def CheckModuleAttributeAccess(self):
+ for attr in dir(_sqlite):
+ _sqlite.__dict__[attr]
+
+ def CheckConnectionAttributes(self):
+ self.cnx.filename
+ self.cnx.sql
+ self.cnx.sqlite_changes()
+ self.cnx.sqlite_last_insert_rowid()
+
+ try:
+ self.cnx.foo = 7
+ self.fail("Could set attribute. Connection object should be read-only.")
+ except TypeError:
+ pass
+
+ def CheckSQLiteExec(self):
+ self.cnx.execute("create table test(id int, name varchar(20))")
+ self.cnx.execute("insert into test(id, name) values (1, 'foo')")
+ self.cnx.execute("insert into test(id, name) values (2, 'bar')")
+
+ expected_colnames = ('id', 'name')
+ expected_values = [('1', 'foo'), ('2', 'bar')]
+ failures = []
+
+ def callback(arg1, items, colnames):
+ if colnames != expected_colnames:
+ failures.append("expected colnames %s, got %s"
+ % (repr(expected_colnames), repr(colnames)))
+ if items not in expected_values:
+ failures.append("%s not in expected_values %s"
+ % (repr(items), repr(expected_values)))
+ else:
+ expected_values.pop(0)
+
+ self.cnx.sqlite_exec("select * from test", callback, None)
+ if len(failures) > 0:
+ for failure in failures:
+ self.fail(failure)
+
+ def CheckSQLiteLastInsertRowID(self):
+ self.cnx.execute("create table test(id integer primary key, name varchar(20))")
+ self.cnx.execute("insert into test(id, name) values (NULL, 'foo')")
+ self.cnx.execute("insert into test(id, name) values (NULL, 'bar')")
+ rowid = self.cnx.sqlite_last_insert_rowid()
+ self.failUnlessEqual(rowid, 2,
+ "last inserted rowid should have been %i, was %i"
+ % (2, rowid))
+
+ def CheckSQLiteChanges(self):
+ self.cnx.execute("create table test(id integer primary key, name varchar(20))")
+ self.cnx.execute("insert into test(id, name) values (NULL, 'foo')")
+ self.cnx.execute("insert into test(id, name) values (NULL, 'bar')")
+ self.cnx.execute("insert into test(id, name) values (NULL, 'baz')")
+ self.cnx.execute("delete from test where name='baz'")
+ changed = self.cnx.sqlite_changes()
+ self.failUnlessEqual(changed, 1,
+ "changed rows should have been %i, was %i"
+ % (1, changed))
+ self.cnx.execute("update test set name='foobar' where id < 10")
+ changed = self.cnx.sqlite_changes()
+ self.failUnlessEqual(changed, 2,
+ "changed rows should have been %i, was %i"
+ % (2, changed))
+
+ def CheckConnectionForProgrammingError(self):
+ self.cnx.close()
+ self.removefile()
+
+ self.failUnlessRaises(ProgrammingError, self.cnx.close)
+ self.failUnlessRaises(ProgrammingError, self.cnx.execute, "")
+
+ def CheckConnectionForNumberOfArguments(self):
+ self.failUnlessRaises(TypeError, self.cnx.close, None)
+ self.failUnlessRaises(TypeError, self.cnx.execute, None, None)
+ self.failUnlessRaises(TypeError, self.cnx.sqlite_changes, None)
+ self.failUnlessRaises(TypeError, self.cnx.sqlite_exec, None)
+ self.failUnlessRaises(TypeError, self.cnx.sqlite_last_insert_rowid, None)
+
+ def CheckConnectionDestructor(self):
+ del self.cnx
+ self.removefile()
+
+ def CheckResultObject(self):
+ create_statement = "create table test(id INTEGER, name TEXT)"
+ self.cnx.execute(create_statement)
+
+ self.failUnlessEqual(create_statement, self.cnx.sql,
+ ".sql should have been %s, was %s" % (create_statement, self.cnx.sql))
+
+ self.cnx.execute("insert into test(id, name) values (4, 'foo')")
+ self.cnx.execute("insert into test(id, name) values (5, 'bar')")
+
+ res = self.cnx.execute("select id, name from test")
+ self.failUnless(res.rowcount == 2, "Should have returned 2 rows, but was %i" % res.rowcount)
+
+ correct_col_defs = (('id', _sqlite.INTEGER, None, None, None, None, None), \
+ ('name', _sqlite.STRING, None, None, None, None, None))
+ self.assertEqual(res.col_defs, correct_col_defs,
+ "col_defs should have been %s, was %s" % (repr(correct_col_defs), repr(res.col_defs)))
+
+ correct_row_list = [(4, 'foo'), (5, 'bar')]
+ self.assertEqual(res.row_list, correct_row_list,
+ "rowlist should have been %s, was %s" % (repr(correct_row_list), repr(res.row_list)))
+
+ def CheckResultAttributes(self):
+ res = self.cnx.execute("select NULL, max(4,5)")
+ try:
+ res.foo = 7
+
+ except TypeError:
+ pass
+
+ def CheckSQLiteVersion(self):
+ try:
+ ver = _sqlite.sqlite_version()
+ except:
+ self.fail('sqlite_version() failed')
+ pat = re.compile(r'\d*\.\d*\.\d*')
+ if not re.match(pat,ver):
+ self.fail('Incorrect sqlite_version() format, '
+ 'should be digits.digits.digits, was %s'%ver)
+
+
+def suite():
+ return unittest.makeSuite(lowlevelTestCases, "Check")
+
+def main():
+ runner = unittest.TextTestRunner()
+ runner.run(suite())
+
+if __name__ == "__main__":
+ main()
diff --git a/test/pgresultset_tests.py b/test/pgresultset_tests.py
new file mode 100644
index 0000000..81f1d46
--- /dev/null
+++ b/test/pgresultset_tests.py
@@ -0,0 +1,202 @@
+#!/usr/bin/env python
+import testsupport
+import os, unittest, sys
+import sqlite
+
+class PgResultSetTests(unittest.TestCase, testsupport.TestSupport):
+ def setUp(self):
+ self.filename = self.getfilename()
+ self.cnx = sqlite.connect(self.filename)
+ self.cur = self.cnx.cursor()
+
+ def tearDown(self):
+ try:
+ self.cnx.close()
+ self.removefile()
+ except AttributeError:
+ pass
+ except sqlite.ProgrammingError:
+ pass
+
+ def getResult(self):
+ try:
+ self.cur.execute("DROP TABLE TEST")
+ except sqlite.DatabaseError, reason:
+ pass
+
+ self.cur.execute("CREATE TABLE TEST (id, name, age)")
+ self.cur.execute("INSERT INTO TEST (id, name, age) VALUES (%s, %s, %s)",
+ (5, 'Alice', 29))
+ self.cur.execute("-- types int, str, int")
+ self.cur.execute("SELECT id, name, age FROM TEST")
+ return self.cur.fetchone()
+
+ def CheckAttributeAccess(self):
+ res = self.getResult()
+ if not hasattr(res, "id"):
+ self.fail("Resultset doesn't have attribute 'id'")
+ if not hasattr(res, "ID"):
+ self.fail("Resultset doesn't have attribute 'ID'")
+
+ def CheckAttributeValue(self):
+ res = self.getResult()
+ if res.id != 5:
+ self.fail("id should be 5, is %i" % res.id)
+ if res.ID != 5:
+ self.fail("ID should be 5, is %i" % res.ID)
+
+ def CheckKeyAccess(self):
+ res = self.getResult()
+ if not "id" in res:
+ self.fail("Resultset doesn't have item 'id'")
+ if not "ID" in res:
+ self.fail("Resultset doesn't have item 'ID'")
+
+ def CheckKeyValue(self):
+ res = self.getResult()
+ if res["id"] != 5:
+ self.fail("id should be 5, is %i" % res.id)
+ if res["ID"] != 5:
+ self.fail("ID should be 5, is %i" % res.ID)
+
+ def CheckIndexValue(self):
+ res = self.getResult()
+ if res[0] != 5:
+ self.fail("item 0 should be 5, is %i" % res.id)
+
+ def Check_haskey(self):
+ res = self.getResult()
+ if not res.has_key("id"):
+ self.fail("resultset should have key 'id'")
+ if not res.has_key("ID"):
+ self.fail("resultset should have key 'ID'")
+ if not res.has_key("Id"):
+ self.fail("resultset should have key 'Id'")
+
+ def Check_len(self):
+ l = len(self.getResult())
+ if l != 3:
+ self.fail("length of resultset should be 3, is %i", l)
+
+ def Check_keys(self):
+ res = self.getResult()
+ if res.keys() != ["id", "name", "age"]:
+ self.fail("keys() should return %s, returns %s" %
+ (["id", "name", "age"], res.keys()))
+
+ def Check_values(self):
+ val = self.getResult().values()
+ if val != (5, 'Alice', 29):
+ self.fail("Wrong values(): %s" % val)
+
+ def Check_items(self):
+ it = self.getResult().items()
+ if it != [("id", 5), ("name", 'Alice'), ("age", 29)]:
+ self.fail("Wrong items(): %s" % it)
+
+ def Check_get(self):
+ res = self.getResult()
+ v = res.get("id")
+ if v != 5:
+ self.fail("Wrong result for get [1]")
+
+ v = res.get("ID")
+ if v != 5:
+ self.fail("Wrong result for get [2]")
+
+ v = res.get("asdf")
+ if v is not None:
+ self.fail("Wrong result for get [3]")
+
+ v = res.get("asdf", 6)
+ if v != 6:
+ self.fail("Wrong result for get [4]")
+
+class TupleResultTests(unittest.TestCase, testsupport.TestSupport):
+ def setUp(self):
+ self.filename = self.getfilename()
+ self.cnx = sqlite.connect(self.filename)
+ self.cnx.rowclass = tuple
+ self.cur = self.cnx.cursor()
+
+ def tearDown(self):
+ try:
+ self.cnx.close()
+ self.removefile()
+ except AttributeError:
+ pass
+ except sqlite.ProgrammingError:
+ pass
+
+ def getOneResult(self):
+ try:
+ self.cur.execute("DROP TABLE TEST")
+ except sqlite.DatabaseError, reason:
+ pass
+
+ self.cur.execute("CREATE TABLE TEST (id, name, age)")
+ self.cur.execute("INSERT INTO TEST (id, name, age) VALUES (%s, %s, %s)",
+ (5, 'Alice', 29))
+ self.cur.execute("-- types int, str, int")
+ self.cur.execute("SELECT id, name, age FROM TEST")
+ return self.cur.fetchone()
+
+ def getManyResults(self):
+ try:
+ self.cur.execute("DROP TABLE TEST")
+ except sqlite.DatabaseError, reason:
+ pass
+
+ self.cur.execute("CREATE TABLE TEST (id, name, age)")
+ self.cur.execute("INSERT INTO TEST (id, name, age) VALUES (%s, %s, %s)",
+ (5, 'Alice', 29))
+ self.cur.execute("INSERT INTO TEST (id, name, age) VALUES (%s, %s, %s)",
+ (5, 'Alice', 29))
+ self.cur.execute("INSERT INTO TEST (id, name, age) VALUES (%s, %s, %s)",
+ (5, 'Alice', 29))
+ self.cur.execute("-- types int, str, int")
+ self.cur.execute("SELECT id, name, age FROM TEST")
+ return self.cur.fetchmany(2)
+
+ def getAllResults(self):
+ try:
+ self.cur.execute("DROP TABLE TEST")
+ except sqlite.DatabaseError, reason:
+ pass
+
+ self.cur.execute("CREATE TABLE TEST (id, name, age)")
+ self.cur.execute("INSERT INTO TEST (id, name, age) VALUES (%s, %s, %s)",
+ (5, 'Alice', 29))
+ self.cur.execute("INSERT INTO TEST (id, name, age) VALUES (%s, %s, %s)",
+ (5, 'Alice', 29))
+ self.cur.execute("INSERT INTO TEST (id, name, age) VALUES (%s, %s, %s)",
+ (5, 'Alice', 29))
+ self.cur.execute("-- types int, str, int")
+ self.cur.execute("SELECT id, name, age FROM TEST")
+ return self.cur.fetchall()
+
+ def CheckRowTypeIsTupleFetchone(self):
+ res = self.getOneResult()
+ self.failUnless(type(res) is tuple, "Result type of row isn't a tuple")
+
+ def CheckRowTypeIsTupleFetchmany(self):
+ res = self.getManyResults()
+ self.failUnless(type(res[1]) is tuple, "Result type of row isn't a tuple")
+
+ def CheckRowTypeIsTupleFetchall(self):
+ res = self.getAllResults()
+ self.failUnless(type(res[2]) is tuple, "Result type of row isn't a tuple")
+
+def suite():
+ tests = [unittest.makeSuite(PgResultSetTests, "Check"),
+ unittest.makeSuite(PgResultSetTests, "Check")]
+ if sys.version_info >= (2,2):
+ tests.append(unittest.makeSuite(TupleResultTests, "Check"))
+ return unittest.TestSuite(tests)
+
+def main():
+ runner = unittest.TextTestRunner()
+ runner.run(suite())
+
+if __name__ == "__main__":
+ main()
diff --git a/test/testsupport.py b/test/testsupport.py
new file mode 100644
index 0000000..18d9ff4
--- /dev/null
+++ b/test/testsupport.py
@@ -0,0 +1,14 @@
+#!/usr/bin/env python
+import _sqlite
+import os, tempfile, unittest
+
+class TestSupport:
+ def getfilename(self):
+ if _sqlite.sqlite_version_info() >= (2, 8, 2):
+ return ":memory:"
+ else:
+ return tempfile.mktemp()
+
+ def removefile(self):
+ if self.filename != ":memory:":
+ os.remove(self.filename)
diff --git a/test/transaction_tests.py b/test/transaction_tests.py
new file mode 100644
index 0000000..57bc70f
--- /dev/null
+++ b/test/transaction_tests.py
@@ -0,0 +1,106 @@
+#!/usr/bin/env python
+import testsupport
+import os, string, sys, types, unittest
+import sqlite
+
+class TransactionTests(unittest.TestCase, testsupport.TestSupport):
+ def setUp(self):
+ self.filename = self.getfilename()
+ self.cnx = sqlite.connect(self.filename)
+ self.cur = self.cnx.cursor()
+
+ def tearDown(self):
+ try:
+ self.cnx.close()
+ self.removefile()
+ except AttributeError:
+ pass
+ except sqlite.InterfaceError:
+ pass
+
+ def CheckValueInTransaction(self):
+ self.cur.execute("create table test (a)")
+ self.cur.execute("insert into test (a) values (%s)", "foo")
+ self.cur.execute("-- types int")
+ self.cur.execute("select count(a) as count from test")
+ res = self.cur.fetchone()
+ self.failUnlessEqual(res.count, 1,
+ "Wrong number of rows during transaction.")
+
+ def CheckValueAfterCommit(self):
+ self.cur.execute("create table test (a)")
+ self.cur.execute("insert into test (a) values (%s)", "foo")
+ self.cur.execute("-- types int")
+ self.cur.execute("select count(a) as count from test")
+ self.cnx.commit()
+ res = self.cur.fetchone()
+ self.failUnlessEqual(res.count, 1,
+ "Wrong number of rows during transaction.")
+
+ def CheckValueAfterRollback(self):
+ self.cur.execute("create table test (a)")
+ self.cnx.commit()
+ self.cur.execute("insert into test (a) values (%s)", "foo")
+ self.cnx.rollback()
+ self.cur.execute("-- types int")
+ self.cur.execute("select count(a) as count from test")
+ res = self.cur.fetchone()
+ self.failUnlessEqual(res.count, 0,
+ "Wrong number of rows during transaction.")
+
+ def CheckImmediateCommit(self):
+ try:
+ self.cnx.commit()
+ except:
+ self.fail("Immediate commit raises exeption.")
+
+ def CheckImmediateRollback(self):
+ try:
+ self.cnx.rollback()
+ except:
+ self.fail("Immediate rollback raises exeption.")
+
+class AutocommitTests(unittest.TestCase, testsupport.TestSupport):
+ def setUp(self):
+ self.filename = self.getfilename()
+ self.cnx = sqlite.connect(self.filename, autocommit=1)
+ self.cur = self.cnx.cursor()
+
+ def tearDown(self):
+ try:
+ self.cnx.close()
+ self.removefile()
+ except AttributeError:
+ pass
+ except sqlite.InterfaceError:
+ pass
+
+ def CheckCommit(self):
+ self.cur.execute("select abs(5)")
+ try:
+ self.cnx.commit()
+ except:
+ self.fail(".commit() raised an exception")
+
+ def CheckRollback(self):
+ self.cur.execute("select abs(5)")
+ self.failUnlessRaises(sqlite.ProgrammingError, self.cnx.rollback)
+
+class ChangeAutocommitTests(unittest.TestCase):
+ pass
+
+def suite():
+ transaction_tests = unittest.makeSuite(TransactionTests, "Check")
+ autocommit_tests = unittest.makeSuite(AutocommitTests, "Check")
+ change_autocommit_tests = unittest.makeSuite(ChangeAutocommitTests, "Check")
+
+ test_suite = unittest.TestSuite((transaction_tests, autocommit_tests,
+ change_autocommit_tests))
+ return test_suite
+
+def main():
+ runner = unittest.TextTestRunner()
+ runner.run(suite())
+
+if __name__ == "__main__":
+ main()
diff --git a/test/type_tests.py b/test/type_tests.py
new file mode 100644
index 0000000..a8d2031
--- /dev/null
+++ b/test/type_tests.py
@@ -0,0 +1,342 @@
+#!/usr/bin/env python
+#-*- coding: ISO-8859-1 -*-
+import testsupport
+import os, string, sys, types, unittest
+import sqlite
+import _sqlite
+
+try:
+ from mx.DateTime import Date, Time, DateTime, DateTimeDelta, DateFrom, \
+ TimeFrom, DateTimeDeltaFrom
+ have_datetime = 1
+except ImportError:
+ have_datetime = 0
+
+def sqlite_is_at_least(major, minor, micro):
+ version = map(int, _sqlite.sqlite_version().split("."))
+ return version >= (major, minor, micro)
+
+class MyType:
+ def __init__(self, val):
+ self.val = int(val)
+
+ def _quote(self):
+ return str(self.val)
+
+ def __repr__(self):
+ return "MyType(%s)" % self.val
+
+ def __cmp__(self, other):
+ assert(isinstance(other, MyType))
+ return cmp(self.val, other.val)
+
+class MyTypeNew(MyType):
+ def __quote__(self):
+ return str(self.val)
+
+ def __getattr__(self, key):
+ # Forbid access to the old-style _quote method
+ if key == "_quote":
+ raise AttributeError
+ else:
+ return self.__dict__[key]
+
+class ExpectedTypes(unittest.TestCase, testsupport.TestSupport):
+ def setUp(self):
+ self.filename = self.getfilename()
+ self.cnx = sqlite.connect(self.filename, converters={"mytype": MyType})
+ self.cur = self.cnx.cursor()
+
+ def tearDown(self):
+ try:
+ self.cnx.close()
+ self.removefile()
+ except AttributeError:
+ pass
+ except sqlite.InterfaceError:
+ pass
+
+ def CheckExpectedTypesStandardTypes(self):
+ self.cur.execute("create table test (a, b, c)")
+ self.cur.execute("insert into test(a, b, c) values (5, 6.3, 'hello')")
+ self.cur.execute("-- types int, float, str")
+ self.cur.execute("select * from test")
+ res = self.cur.fetchone()
+ self.failUnless(isinstance(res.a, types.IntType),
+ "The built-in int converter didn't work.")
+ self.failUnless(isinstance(res.b, types.FloatType),
+ "The built-in float converter didn't work.")
+ self.failUnless(isinstance(res.c, types.StringType),
+ "The built-in string converter didn't work.")
+
+ def CheckExpectedTypesStandardTypesNull(self):
+ self.cur.execute("create table test (a, b, c)")
+ self.cur.execute("insert into test(a, b, c) values (NULL, NULL, NULL)")
+ self.cur.execute("-- types int, float, str")
+ self.cur.execute("select * from test")
+ res = self.cur.fetchone()
+ self.failUnless(res.a == None,
+ "The built-in int converter should have returned None.")
+ self.failUnless(res.b == None,
+ "The built-in float converter should have returned None.")
+ self.failUnless(res.c == None,
+ "The built-in string converter should have returned None.")
+
+ def CheckExpectedTypesCustomTypes(self):
+ value = MyType(10)
+ self.cur.execute("create table test (a)")
+ self.cur.execute("insert into test(a) values (%s)", value)
+ self.cur.execute("-- types mytype")
+ self.cur.execute("select a from test")
+ res = self.cur.fetchone()
+
+ self.failUnless(isinstance(res.a, MyType),
+ "The converter did return the wrong type.")
+ self.failUnlessEqual(value, res.a,
+ "The returned value and the inserted one are different.")
+
+ def CheckNewQuoteMethod(self):
+ value = MyTypeNew(10)
+ self.cur.execute("create table test (a integer)")
+ self.cur.execute("insert into test(a) values (%s)", value)
+ self.cur.execute("select a from test")
+ res = self.cur.fetchone()
+
+ self.failUnlessEqual(10, res.a,
+ "The returned value and the inserted one are different.")
+
+ def CheckExpectedTypesCustomTypesNull(self):
+ value = None
+ self.cur.execute("create table test (a)")
+ self.cur.execute("insert into test(a) values (%s)", value)
+ self.cur.execute("-- types mytype")
+ self.cur.execute("select a from test")
+ res = self.cur.fetchone()
+
+ self.failUnless(res.a == None,
+ "The converter should have returned None.")
+
+ def CheckResetExpectedTypes(self):
+ self.cur.execute("create table test (a)")
+ self.cur.execute("insert into test(a) values ('5')")
+ self.cur.execute("-- types int")
+ self.cur.execute("select a from test")
+ self.cur.execute("select a from test")
+ res = self.cur.fetchone()
+ self.assert_(isinstance(res.a, types.StringType),
+ "'resetting types' didn't succeed.")
+
+ if have_datetime:
+ def CheckDateTypes(self):
+ dt = DateTime(2002, 6, 15)
+ dtd = DateTimeDelta(0, 0, 0, 1)
+
+ self.cur.execute("create table test (t timestamp)")
+ self.cur.execute("insert into test(t) values (%s)", (dt,))
+ self.cur.execute("select t from test")
+ res = self.cur.fetchone()
+
+ self.failUnlessEqual(dt, res.t,
+ "DateTime object should have been %s, was %s"
+ % (repr(dt), repr(res.t)))
+
+ self.cur.execute("drop table test")
+ self.cur.execute("create table test(i interval)")
+ self.cur.execute("insert into test(i) values (%s)", (dtd,))
+ self.cur.execute("select i from test")
+ res = self.cur.fetchone()
+
+ self.failUnlessEqual(dtd, res.i,
+ "DateTimeDelta object should have been %s, was %s"
+ % (repr(dtd), repr(res.i)))
+
+class UnicodeTestsLatin1(unittest.TestCase, testsupport.TestSupport):
+ def setUp(self):
+ self.filename = self.getfilename()
+ self.cnx = sqlite.connect(self.filename, encoding=("iso-8859-1",))
+ self.cur = self.cnx.cursor()
+
+ def tearDown(self):
+ try:
+ self.cnx.close()
+ self.removefile()
+ except AttributeError:
+ pass
+ except sqlite.InterfaceError:
+ pass
+
+ def CheckGetSameBack(self):
+ test_str = unicode("sterreich", "latin1")
+ self.cur.execute("create table test (a UNICODE)")
+ self.cur.execute("insert into test(a) values (%s)", test_str)
+ self.cur.execute("select a from test")
+ res = self.cur.fetchone()
+ self.failUnlessEqual(type(test_str), type(res.a),
+ "Something other than a Unicode string was fetched: %s"
+ % (str(type(res.a))))
+ self.failUnlessEqual(test_str, res.a,
+ "Fetching the unicode string doesn't return the inserted one.")
+
+class UnicodeTestsUtf8(unittest.TestCase, testsupport.TestSupport):
+ def setUp(self):
+ self.filename = self.getfilename()
+ self.cnx = sqlite.connect(self.filename, encoding="utf-8")
+ self.cur = self.cnx.cursor()
+
+ def tearDown(self):
+ try:
+ self.cnx.close()
+ self.removefile()
+ except AttributeError:
+ pass
+ except sqlite.InterfaceError:
+ pass
+
+ def CheckGetSameBack(self):
+ # PREZIDENT ROSSI'SKO' FEDERACII sterreich
+ test_str = unicode("ПРЕЗИДЕНТ РОССИЙСКОЙ ФЕДЕРАЦИИ Österreich", "utf-8")
+
+ self.cur.execute("create table test (a UNICODE)")
+ self.cur.execute("insert into test(a) values (%s)", test_str)
+ self.cur.execute("select a from test")
+ res = self.cur.fetchone()
+ self.failUnlessEqual(type(test_str), type(res.a),
+ "Something other than a Unicode string was fetched: %s"
+ % (str(type(res.a))))
+ self.failUnlessEqual(test_str, res.a,
+ "Fetching the unicode string doesn't return the inserted one.")
+
+class UnicodeTestsKOI8R(unittest.TestCase, testsupport.TestSupport):
+ def setUp(self):
+ self.filename = self.getfilename()
+ self.cnx = sqlite.connect(self.filename, encoding="koi8-r")
+ self.cur = self.cnx.cursor()
+
+ def tearDown(self):
+ try:
+ self.cnx.close()
+ self.removefile()
+ except AttributeError:
+ pass
+ except sqlite.InterfaceError:
+ pass
+
+ def CheckGetSameBack(self):
+ # PREZIDENT ROSSI'SKO' FEDERACII
+ # (President of the Russian Federation)
+ test_str = unicode(" ", "koi8-r")
+
+ self.cur.execute("create table test (a UNICODE)")
+ self.cur.execute("insert into test(a) values (%s)", test_str)
+ self.cur.execute("select a from test")
+ res = self.cur.fetchone()
+ self.failUnlessEqual(type(test_str), type(res.a),
+ "Something other than a Unicode string was fetched: %s"
+ % (str(type(res.a))))
+ self.failUnlessEqual(test_str, res.a,
+ "Fetching the unicode string doesn't return the inserted one.")
+
+class SQLiteBuiltinTypeSupport(unittest.TestCase, testsupport.TestSupport):
+ def setUp(self):
+ self.filename = self.getfilename()
+ self.cnx = sqlite.connect(self.filename, encoding="koi8-r")
+ self.cur = self.cnx.cursor()
+
+ def tearDown(self):
+ try:
+ self.cnx.close()
+ self.removefile()
+ except AttributeError:
+ pass
+ except sqlite.InterfaceError:
+ pass
+
+ def CheckInt(self):
+ self.cur.execute("create table test (a INTEGER)")
+ self.cur.execute("insert into test(a) values (%s)", 5)
+ self.cur.execute("select a from test")
+ res = self.cur.fetchone()
+ self.failUnlessEqual(type(5), type(res.a),
+ "Something other than an INTEGER was fetched: %s"
+ % (str(type(res.a))))
+
+ def CheckFloat(self):
+ self.cur.execute("create table test (a FLOAT)")
+ self.cur.execute("insert into test(a) values (%s)", 5.7)
+ self.cur.execute("select a from test")
+ res = self.cur.fetchone()
+ self.failUnlessEqual(type(5.7), type(res.a),
+ "Something other than a FLOAT was fetched: %s"
+ % (str(type(res.a))))
+
+ def CheckString(self):
+ self.cur.execute("create table test (a VARCHAR(20))")
+ self.cur.execute("insert into test(a) values (%s)", "foo")
+ self.cur.execute("select a from test")
+ res = self.cur.fetchone()
+ self.failUnlessEqual(type("foo"), type(res.a),
+ "Something other than a VARCHAR was fetched: %s"
+ % (str(type(res.a))))
+
+ def CheckBinary(self):
+ bindata = "".join([chr(x) for x in range(256)])
+ self.cur.execute("create table test(b BINARY)")
+ self.cur.execute("insert into test(b) values (%s)", sqlite.Binary(bindata))
+ self.cur.execute("select b from test")
+ res = self.cur.fetchone()
+ self.failUnlessEqual(bindata, res.b, "Binary roundtrip didn't produce original string")
+ self.failUnlessEqual(self.cur.description[0][1], sqlite.BINARY, "Wrong type code")
+
+ if have_datetime:
+ def CheckDate(self):
+ self.cur.execute("create table test (a DATE)")
+ d = DateFrom("2002-05-07")
+ self.cur.execute("insert into test(a) values (%s)", d)
+ self.cur.execute("select a from test")
+ res = self.cur.fetchone()
+ if res.a != d:
+ self.fail("didn't get back the same DATE")
+
+ def CheckTime(self):
+ self.cur.execute("create table test (a TIME)")
+ t = TimeFrom("22:15:00")
+ self.cur.execute("insert into test(a) values (%s)", t)
+ self.cur.execute("select a from test")
+ res = self.cur.fetchone()
+ if res.a != t:
+ self.fail("didn't get back the same TIME")
+
+ def CheckTimestamp(self):
+ self.cur.execute("create table test (a TIMESTAMP)")
+ d = DateFrom("2002-05-07 22:15:00")
+ self.cur.execute("insert into test(a) values (%s)", d)
+ self.cur.execute("select a from test")
+ res = self.cur.fetchone()
+ if res.a != d:
+ self.fail("didn't get back the same TIMESTAMP")
+
+ def CheckInterval(self):
+ self.cur.execute("create table test (a INTERVAL)")
+ d = DateTimeDeltaFrom("02:00:00")
+ self.cur.execute("insert into test(a) values (%s)", d)
+ self.cur.execute("select a from test")
+ res = self.cur.fetchone()
+ if res.a != d:
+ self.fail("didn't get back the same INTERVAL")
+
+def suite():
+ expected_suite = unittest.makeSuite(ExpectedTypes, "Check")
+ unicode_suite1 = unittest.makeSuite(UnicodeTestsLatin1, "Check")
+ unicode_suite2 = unittest.makeSuite(UnicodeTestsUtf8, "Check")
+ unicode_suite3 = unittest.makeSuite(UnicodeTestsKOI8R, "Check")
+ builtin_suite = unittest.makeSuite(SQLiteBuiltinTypeSupport, "Check")
+
+ return unittest.TestSuite((expected_suite, unicode_suite1, unicode_suite2,
+ unicode_suite3, builtin_suite))
+
+def main():
+ runner = unittest.TextTestRunner()
+ runner.run(suite())
+
+if __name__ == "__main__":
+ main()
diff --git a/test/userfunction_tests.py b/test/userfunction_tests.py
new file mode 100644
index 0000000..ce70387
--- /dev/null
+++ b/test/userfunction_tests.py
@@ -0,0 +1,259 @@
+#!/usr/bin/env python
+import testsupport
+import os, string, sys, types, unittest
+import sqlite
+
+def intreturner(x):
+ return int(x) * 2
+
+def floatreturner(x):
+ return float(x) * 2.0
+
+def stringreturner(x):
+ return "[%s]" % x
+
+def nullreturner(x):
+ return None
+
+def exceptionreturner(x):
+ return 5 / 0
+
+class MySum:
+ def __init__(self):
+ self.reset()
+
+ def reset(self):
+ self.sum = 0
+
+ def step(self, x):
+ self.sum += int(x)
+
+ def finalize(self):
+ val = self.sum
+ self.reset()
+ return val
+
+class MySumFloat:
+ def __init__(self):
+ self.reset()
+
+ def reset(self):
+ self.sum = 0.0
+
+ def step(self, x):
+ self.sum += float(x)
+
+ def finalize(self):
+ val = self.sum
+ self.reset()
+ return val
+
+class MySumReturnNull:
+ def __init__(self):
+ self.reset()
+
+ def reset(self):
+ self.sum = 0
+
+ def step(self, x):
+ self.sum += int(x)
+
+ def finalize(self):
+ return None
+
+class MySumStepExeption:
+ def __init__(self):
+ self.reset()
+
+ def reset(self):
+ self.sum = 0
+
+ def step(self, x):
+ self.sum += int(x) / 0
+
+ def finalize(self):
+ val = self.sum
+ self.reset()
+ return val
+
+class MySumFinalizeExeption:
+ def __init__(self):
+ self.reset()
+
+ def reset(self):
+ self.sum = 0
+
+ def step(self, x):
+ self.sum += int(x)
+
+ def finalize(self):
+ val = self.sum / 0
+ self.reset()
+ return val
+
+class UserFunctions(unittest.TestCase, testsupport.TestSupport):
+ def setUp(self):
+ self.filename = self.getfilename()
+ self.cnx = sqlite.connect(self.filename)
+
+ sqlite._sqlite.enable_callback_debugging(0)
+
+ self.cnx.create_function("intreturner", 1, intreturner)
+ self.cnx.create_function("floatreturner", 1, floatreturner)
+ self.cnx.create_function("stringreturner", 1, stringreturner)
+ self.cnx.create_function("nullreturner", 1, nullreturner)
+ self.cnx.create_function("exceptionreturner", 1, exceptionreturner)
+
+ self.cnx.create_aggregate("mysum", 1, MySum)
+ self.cnx.create_aggregate("mysumfloat", 1, MySumFloat)
+ self.cnx.create_aggregate("mysumreturnnull", 1, MySumReturnNull )
+ self.cnx.create_aggregate("mysumstepexception", 1, MySumStepExeption)
+ self.cnx.create_aggregate("mysumfinalizeexception", 1, MySumFinalizeExeption)
+ self.cur = self.cnx.cursor()
+
+ def tearDown(self):
+ try:
+ self.cnx.close()
+ self.removefile()
+ except AttributeError:
+ pass
+ except sqlite.InterfaceError:
+ pass
+
+ def CheckIntFunction(self):
+ self.cur.execute("create table test (a)")
+ self.cur.execute("insert into test(a) values (%s)", 5)
+ self.cur.execute("-- types int")
+ self.cur.execute("select intreturner(a) as a from test")
+ res = self.cur.fetchone()
+ self.failUnless(isinstance(res.a, types.IntType),
+ "The result should have been an int.")
+ self.failUnlessEqual(res.a, 10,
+ "The function returned the wrong result.")
+
+ def CheckFloatFunction(self):
+ self.cur.execute("create table test (a)")
+ self.cur.execute("insert into test(a) values (%s)", 5.0)
+ self.cur.execute("-- types float")
+ self.cur.execute("select floatreturner(a) as a from test")
+ res = self.cur.fetchone()
+ self.failUnless(isinstance(res.a, types.FloatType),
+ "The result should have been a float.")
+ self.failUnlessEqual(res.a, 5.0 * 2.0,
+ "The function returned the wrong result.")
+
+ def CheckStringFunction(self):
+ mystr = "test"
+ self.cur.execute("create table test (a)")
+ self.cur.execute("insert into test(a) values (%s)", mystr)
+ self.cur.execute("-- types str")
+ self.cur.execute("select stringreturner(a) as a from test")
+ res = self.cur.fetchone()
+ self.failUnless(isinstance(res.a, types.StringType),
+ "The result should have been a string.")
+ self.failUnlessEqual(res.a, "[%s]" % mystr,
+ "The function returned the wrong result.")
+
+ def CheckNullFunction(self):
+ mystr = "test"
+ self.cur.execute("create table test (a)")
+ self.cur.execute("insert into test(a) values (%s)", mystr)
+ self.cur.execute("-- types str")
+ self.cur.execute("select nullreturner(a) as a from test")
+ res = self.cur.fetchone()
+ self.failUnlessEqual(res.a, None,
+ "The result should have been None.")
+
+ def CheckFunctionWithNullArgument(self):
+ mystr = "test"
+ self.cur.execute("-- types str")
+ self.cur.execute("select nullreturner(NULL) as a")
+ res = self.cur.fetchone()
+ self.failUnlessEqual(res.a, None,
+ "The result should have been None.")
+
+
+ def CheckExceptionFunction(self):
+ mystr = "test"
+ self.cur.execute("create table test (a)")
+ self.cur.execute("insert into test(a) values (%s)", mystr)
+ self.cur.execute("-- types str")
+ try:
+ self.cur.execute("select exceptionreturner(a) as a from test")
+ except sqlite.DatabaseError, reason:
+ pass
+ except Exception, reason:
+ self.fail("Wrong exception raised: %s", sys.exc_info()[0])
+
+ def CheckAggregateBasic(self):
+ self.cur.execute("create table test (a)")
+ self.cur.executemany("insert into test(a) values (%s)", [(10,), (20,), (30,)])
+ self.cur.execute("-- types int")
+ self.cur.execute("select mysum(a) as sum from test")
+ res = self.cur.fetchone()
+ self.failUnless(isinstance(res.sum, types.IntType),
+ "The result should have been an int.")
+ self.failUnlessEqual(res.sum, 60,
+ "The function returned the wrong result.")
+
+ def CheckAggregateFloat(self):
+ self.cur.execute("create table test (a)")
+ self.cur.executemany("insert into test(a) values (%s)", [(10.0,), (20.0,), (30.0,)])
+ self.cur.execute("-- types float")
+ self.cur.execute("select mysumfloat(a) as sum from test")
+ res = self.cur.fetchone()
+ self.failUnless(isinstance(res.sum, types.FloatType),
+ "The result should have been an float.")
+ if res.sum <= 59.9 or res.sum >= 60.1:
+ self.fail("The function returned the wrong result.")
+
+ def CheckAggregateReturnNull(self):
+ self.cur.execute("create table test (a)")
+ self.cur.executemany("insert into test(a) values (%s)", [(10,), (20,), (30,)])
+ self.cur.execute("-- types int")
+ self.cur.execute("select mysumreturnnull(a) as sum from test")
+ res = self.cur.fetchone()
+ self.failUnlessEqual(res.sum, None,
+ "The result should have been None.")
+
+ def CheckAggregateStepException(self):
+ self.cur.execute("create table test (a)")
+ self.cur.executemany("insert into test(a) values (%s)", [(10,), (20,), (30,)])
+ self.cur.execute("-- types int")
+ try:
+ self.cur.execute("select mysumstepexception(a) as sum from test")
+ except sqlite.DatabaseError, reason:
+ pass
+ except Exception, reason:
+ self.fail("Wrong exception raised: %s" % sys.exc_info()[0])
+
+ def CheckAggregateFinalizeException(self):
+ self.cur.execute("create table test (a)")
+ self.cur.executemany("insert into test(a) values (%s)", [(10,), (20,), (30,)])
+ self.cur.execute("-- types int")
+ try:
+ self.cur.execute("select mysumfinalizeexception(a) as sum from test")
+ except sqlite.DatabaseError, reason:
+ pass
+ except Exception, reason:
+ self.fail("Wrong exception raised: %s", sys.exc_info()[0])
+
+ def CheckAggregateStepNullArgument(self):
+ self.cur.execute("-- types int")
+ self.cur.execute("select mysum(NULL) as a")
+ res = self.cur.fetchone()
+ self.failUnlessEqual(res.a, 0,
+ "The result should have been 0.")
+
+
+def suite():
+ user_functions = unittest.makeSuite(UserFunctions, "Check")
+ test_suite = unittest.TestSuite((user_functions,))
+ return test_suite
+
+def main():
+ runner = unittest.TextTestRunner()
+ runner.run(suite())
+
+if __name__ == "__main__":
+ main()