diff options
Diffstat (limited to 'test/userfunction_tests.py')
-rw-r--r-- | test/userfunction_tests.py | 259 |
1 files changed, 259 insertions, 0 deletions
diff --git a/test/userfunction_tests.py b/test/userfunction_tests.py new file mode 100644 index 0000000..ce70387 --- /dev/null +++ b/test/userfunction_tests.py @@ -0,0 +1,259 @@ +#!/usr/bin/env python +import testsupport +import os, string, sys, types, unittest +import sqlite + +def intreturner(x): + return int(x) * 2 + +def floatreturner(x): + return float(x) * 2.0 + +def stringreturner(x): + return "[%s]" % x + +def nullreturner(x): + return None + +def exceptionreturner(x): + return 5 / 0 + +class MySum: + def __init__(self): + self.reset() + + def reset(self): + self.sum = 0 + + def step(self, x): + self.sum += int(x) + + def finalize(self): + val = self.sum + self.reset() + return val + +class MySumFloat: + def __init__(self): + self.reset() + + def reset(self): + self.sum = 0.0 + + def step(self, x): + self.sum += float(x) + + def finalize(self): + val = self.sum + self.reset() + return val + +class MySumReturnNull: + def __init__(self): + self.reset() + + def reset(self): + self.sum = 0 + + def step(self, x): + self.sum += int(x) + + def finalize(self): + return None + +class MySumStepExeption: + def __init__(self): + self.reset() + + def reset(self): + self.sum = 0 + + def step(self, x): + self.sum += int(x) / 0 + + def finalize(self): + val = self.sum + self.reset() + return val + +class MySumFinalizeExeption: + def __init__(self): + self.reset() + + def reset(self): + self.sum = 0 + + def step(self, x): + self.sum += int(x) + + def finalize(self): + val = self.sum / 0 + self.reset() + return val + +class UserFunctions(unittest.TestCase, testsupport.TestSupport): + def setUp(self): + self.filename = self.getfilename() + self.cnx = sqlite.connect(self.filename) + + sqlite._sqlite.enable_callback_debugging(0) + + self.cnx.create_function("intreturner", 1, intreturner) + self.cnx.create_function("floatreturner", 1, floatreturner) + self.cnx.create_function("stringreturner", 1, stringreturner) + self.cnx.create_function("nullreturner", 1, nullreturner) + self.cnx.create_function("exceptionreturner", 1, exceptionreturner) + + self.cnx.create_aggregate("mysum", 1, MySum) + self.cnx.create_aggregate("mysumfloat", 1, MySumFloat) + self.cnx.create_aggregate("mysumreturnnull", 1, MySumReturnNull ) + self.cnx.create_aggregate("mysumstepexception", 1, MySumStepExeption) + self.cnx.create_aggregate("mysumfinalizeexception", 1, MySumFinalizeExeption) + self.cur = self.cnx.cursor() + + def tearDown(self): + try: + self.cnx.close() + self.removefile() + except AttributeError: + pass + except sqlite.InterfaceError: + pass + + def CheckIntFunction(self): + self.cur.execute("create table test (a)") + self.cur.execute("insert into test(a) values (%s)", 5) + self.cur.execute("-- types int") + self.cur.execute("select intreturner(a) as a from test") + res = self.cur.fetchone() + self.failUnless(isinstance(res.a, types.IntType), + "The result should have been an int.") + self.failUnlessEqual(res.a, 10, + "The function returned the wrong result.") + + def CheckFloatFunction(self): + self.cur.execute("create table test (a)") + self.cur.execute("insert into test(a) values (%s)", 5.0) + self.cur.execute("-- types float") + self.cur.execute("select floatreturner(a) as a from test") + res = self.cur.fetchone() + self.failUnless(isinstance(res.a, types.FloatType), + "The result should have been a float.") + self.failUnlessEqual(res.a, 5.0 * 2.0, + "The function returned the wrong result.") + + def CheckStringFunction(self): + mystr = "test" + self.cur.execute("create table test (a)") + self.cur.execute("insert into test(a) values (%s)", mystr) + self.cur.execute("-- types str") + self.cur.execute("select stringreturner(a) as a from test") + res = self.cur.fetchone() + self.failUnless(isinstance(res.a, types.StringType), + "The result should have been a string.") + self.failUnlessEqual(res.a, "[%s]" % mystr, + "The function returned the wrong result.") + + def CheckNullFunction(self): + mystr = "test" + self.cur.execute("create table test (a)") + self.cur.execute("insert into test(a) values (%s)", mystr) + self.cur.execute("-- types str") + self.cur.execute("select nullreturner(a) as a from test") + res = self.cur.fetchone() + self.failUnlessEqual(res.a, None, + "The result should have been None.") + + def CheckFunctionWithNullArgument(self): + mystr = "test" + self.cur.execute("-- types str") + self.cur.execute("select nullreturner(NULL) as a") + res = self.cur.fetchone() + self.failUnlessEqual(res.a, None, + "The result should have been None.") + + + def CheckExceptionFunction(self): + mystr = "test" + self.cur.execute("create table test (a)") + self.cur.execute("insert into test(a) values (%s)", mystr) + self.cur.execute("-- types str") + try: + self.cur.execute("select exceptionreturner(a) as a from test") + except sqlite.DatabaseError, reason: + pass + except Exception, reason: + self.fail("Wrong exception raised: %s", sys.exc_info()[0]) + + def CheckAggregateBasic(self): + self.cur.execute("create table test (a)") + self.cur.executemany("insert into test(a) values (%s)", [(10,), (20,), (30,)]) + self.cur.execute("-- types int") + self.cur.execute("select mysum(a) as sum from test") + res = self.cur.fetchone() + self.failUnless(isinstance(res.sum, types.IntType), + "The result should have been an int.") + self.failUnlessEqual(res.sum, 60, + "The function returned the wrong result.") + + def CheckAggregateFloat(self): + self.cur.execute("create table test (a)") + self.cur.executemany("insert into test(a) values (%s)", [(10.0,), (20.0,), (30.0,)]) + self.cur.execute("-- types float") + self.cur.execute("select mysumfloat(a) as sum from test") + res = self.cur.fetchone() + self.failUnless(isinstance(res.sum, types.FloatType), + "The result should have been an float.") + if res.sum <= 59.9 or res.sum >= 60.1: + self.fail("The function returned the wrong result.") + + def CheckAggregateReturnNull(self): + self.cur.execute("create table test (a)") + self.cur.executemany("insert into test(a) values (%s)", [(10,), (20,), (30,)]) + self.cur.execute("-- types int") + self.cur.execute("select mysumreturnnull(a) as sum from test") + res = self.cur.fetchone() + self.failUnlessEqual(res.sum, None, + "The result should have been None.") + + def CheckAggregateStepException(self): + self.cur.execute("create table test (a)") + self.cur.executemany("insert into test(a) values (%s)", [(10,), (20,), (30,)]) + self.cur.execute("-- types int") + try: + self.cur.execute("select mysumstepexception(a) as sum from test") + except sqlite.DatabaseError, reason: + pass + except Exception, reason: + self.fail("Wrong exception raised: %s" % sys.exc_info()[0]) + + def CheckAggregateFinalizeException(self): + self.cur.execute("create table test (a)") + self.cur.executemany("insert into test(a) values (%s)", [(10,), (20,), (30,)]) + self.cur.execute("-- types int") + try: + self.cur.execute("select mysumfinalizeexception(a) as sum from test") + except sqlite.DatabaseError, reason: + pass + except Exception, reason: + self.fail("Wrong exception raised: %s", sys.exc_info()[0]) + + def CheckAggregateStepNullArgument(self): + self.cur.execute("-- types int") + self.cur.execute("select mysum(NULL) as a") + res = self.cur.fetchone() + self.failUnlessEqual(res.a, 0, + "The result should have been 0.") + + +def suite(): + user_functions = unittest.makeSuite(UserFunctions, "Check") + test_suite = unittest.TestSuite((user_functions,)) + return test_suite + +def main(): + runner = unittest.TextTestRunner() + runner.run(suite()) + +if __name__ == "__main__": + main() |