summaryrefslogtreecommitdiff
path: root/test/type_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/type_tests.py')
-rw-r--r--test/type_tests.py342
1 files changed, 342 insertions, 0 deletions
diff --git a/test/type_tests.py b/test/type_tests.py
new file mode 100644
index 0000000..a8d2031
--- /dev/null
+++ b/test/type_tests.py
@@ -0,0 +1,342 @@
+#!/usr/bin/env python
+#-*- coding: ISO-8859-1 -*-
+import testsupport
+import os, string, sys, types, unittest
+import sqlite
+import _sqlite
+
+try:
+ from mx.DateTime import Date, Time, DateTime, DateTimeDelta, DateFrom, \
+ TimeFrom, DateTimeDeltaFrom
+ have_datetime = 1
+except ImportError:
+ have_datetime = 0
+
+def sqlite_is_at_least(major, minor, micro):
+ version = map(int, _sqlite.sqlite_version().split("."))
+ return version >= (major, minor, micro)
+
+class MyType:
+ def __init__(self, val):
+ self.val = int(val)
+
+ def _quote(self):
+ return str(self.val)
+
+ def __repr__(self):
+ return "MyType(%s)" % self.val
+
+ def __cmp__(self, other):
+ assert(isinstance(other, MyType))
+ return cmp(self.val, other.val)
+
+class MyTypeNew(MyType):
+ def __quote__(self):
+ return str(self.val)
+
+ def __getattr__(self, key):
+ # Forbid access to the old-style _quote method
+ if key == "_quote":
+ raise AttributeError
+ else:
+ return self.__dict__[key]
+
+class ExpectedTypes(unittest.TestCase, testsupport.TestSupport):
+ def setUp(self):
+ self.filename = self.getfilename()
+ self.cnx = sqlite.connect(self.filename, converters={"mytype": MyType})
+ self.cur = self.cnx.cursor()
+
+ def tearDown(self):
+ try:
+ self.cnx.close()
+ self.removefile()
+ except AttributeError:
+ pass
+ except sqlite.InterfaceError:
+ pass
+
+ def CheckExpectedTypesStandardTypes(self):
+ self.cur.execute("create table test (a, b, c)")
+ self.cur.execute("insert into test(a, b, c) values (5, 6.3, 'hello')")
+ self.cur.execute("-- types int, float, str")
+ self.cur.execute("select * from test")
+ res = self.cur.fetchone()
+ self.failUnless(isinstance(res.a, types.IntType),
+ "The built-in int converter didn't work.")
+ self.failUnless(isinstance(res.b, types.FloatType),
+ "The built-in float converter didn't work.")
+ self.failUnless(isinstance(res.c, types.StringType),
+ "The built-in string converter didn't work.")
+
+ def CheckExpectedTypesStandardTypesNull(self):
+ self.cur.execute("create table test (a, b, c)")
+ self.cur.execute("insert into test(a, b, c) values (NULL, NULL, NULL)")
+ self.cur.execute("-- types int, float, str")
+ self.cur.execute("select * from test")
+ res = self.cur.fetchone()
+ self.failUnless(res.a == None,
+ "The built-in int converter should have returned None.")
+ self.failUnless(res.b == None,
+ "The built-in float converter should have returned None.")
+ self.failUnless(res.c == None,
+ "The built-in string converter should have returned None.")
+
+ def CheckExpectedTypesCustomTypes(self):
+ value = MyType(10)
+ self.cur.execute("create table test (a)")
+ self.cur.execute("insert into test(a) values (%s)", value)
+ self.cur.execute("-- types mytype")
+ self.cur.execute("select a from test")
+ res = self.cur.fetchone()
+
+ self.failUnless(isinstance(res.a, MyType),
+ "The converter did return the wrong type.")
+ self.failUnlessEqual(value, res.a,
+ "The returned value and the inserted one are different.")
+
+ def CheckNewQuoteMethod(self):
+ value = MyTypeNew(10)
+ self.cur.execute("create table test (a integer)")
+ self.cur.execute("insert into test(a) values (%s)", value)
+ self.cur.execute("select a from test")
+ res = self.cur.fetchone()
+
+ self.failUnlessEqual(10, res.a,
+ "The returned value and the inserted one are different.")
+
+ def CheckExpectedTypesCustomTypesNull(self):
+ value = None
+ self.cur.execute("create table test (a)")
+ self.cur.execute("insert into test(a) values (%s)", value)
+ self.cur.execute("-- types mytype")
+ self.cur.execute("select a from test")
+ res = self.cur.fetchone()
+
+ self.failUnless(res.a == None,
+ "The converter should have returned None.")
+
+ def CheckResetExpectedTypes(self):
+ self.cur.execute("create table test (a)")
+ self.cur.execute("insert into test(a) values ('5')")
+ self.cur.execute("-- types int")
+ self.cur.execute("select a from test")
+ self.cur.execute("select a from test")
+ res = self.cur.fetchone()
+ self.assert_(isinstance(res.a, types.StringType),
+ "'resetting types' didn't succeed.")
+
+ if have_datetime:
+ def CheckDateTypes(self):
+ dt = DateTime(2002, 6, 15)
+ dtd = DateTimeDelta(0, 0, 0, 1)
+
+ self.cur.execute("create table test (t timestamp)")
+ self.cur.execute("insert into test(t) values (%s)", (dt,))
+ self.cur.execute("select t from test")
+ res = self.cur.fetchone()
+
+ self.failUnlessEqual(dt, res.t,
+ "DateTime object should have been %s, was %s"
+ % (repr(dt), repr(res.t)))
+
+ self.cur.execute("drop table test")
+ self.cur.execute("create table test(i interval)")
+ self.cur.execute("insert into test(i) values (%s)", (dtd,))
+ self.cur.execute("select i from test")
+ res = self.cur.fetchone()
+
+ self.failUnlessEqual(dtd, res.i,
+ "DateTimeDelta object should have been %s, was %s"
+ % (repr(dtd), repr(res.i)))
+
+class UnicodeTestsLatin1(unittest.TestCase, testsupport.TestSupport):
+ def setUp(self):
+ self.filename = self.getfilename()
+ self.cnx = sqlite.connect(self.filename, encoding=("iso-8859-1",))
+ self.cur = self.cnx.cursor()
+
+ def tearDown(self):
+ try:
+ self.cnx.close()
+ self.removefile()
+ except AttributeError:
+ pass
+ except sqlite.InterfaceError:
+ pass
+
+ def CheckGetSameBack(self):
+ test_str = unicode("sterreich", "latin1")
+ self.cur.execute("create table test (a UNICODE)")
+ self.cur.execute("insert into test(a) values (%s)", test_str)
+ self.cur.execute("select a from test")
+ res = self.cur.fetchone()
+ self.failUnlessEqual(type(test_str), type(res.a),
+ "Something other than a Unicode string was fetched: %s"
+ % (str(type(res.a))))
+ self.failUnlessEqual(test_str, res.a,
+ "Fetching the unicode string doesn't return the inserted one.")
+
+class UnicodeTestsUtf8(unittest.TestCase, testsupport.TestSupport):
+ def setUp(self):
+ self.filename = self.getfilename()
+ self.cnx = sqlite.connect(self.filename, encoding="utf-8")
+ self.cur = self.cnx.cursor()
+
+ def tearDown(self):
+ try:
+ self.cnx.close()
+ self.removefile()
+ except AttributeError:
+ pass
+ except sqlite.InterfaceError:
+ pass
+
+ def CheckGetSameBack(self):
+ # PREZIDENT ROSSI'SKO' FEDERACII sterreich
+ test_str = unicode("ПРЕЗИДЕНТ РОССИЙСКОЙ ФЕДЕРАЦИИ Österreich", "utf-8")
+
+ self.cur.execute("create table test (a UNICODE)")
+ self.cur.execute("insert into test(a) values (%s)", test_str)
+ self.cur.execute("select a from test")
+ res = self.cur.fetchone()
+ self.failUnlessEqual(type(test_str), type(res.a),
+ "Something other than a Unicode string was fetched: %s"
+ % (str(type(res.a))))
+ self.failUnlessEqual(test_str, res.a,
+ "Fetching the unicode string doesn't return the inserted one.")
+
+class UnicodeTestsKOI8R(unittest.TestCase, testsupport.TestSupport):
+ def setUp(self):
+ self.filename = self.getfilename()
+ self.cnx = sqlite.connect(self.filename, encoding="koi8-r")
+ self.cur = self.cnx.cursor()
+
+ def tearDown(self):
+ try:
+ self.cnx.close()
+ self.removefile()
+ except AttributeError:
+ pass
+ except sqlite.InterfaceError:
+ pass
+
+ def CheckGetSameBack(self):
+ # PREZIDENT ROSSI'SKO' FEDERACII
+ # (President of the Russian Federation)
+ test_str = unicode(" ", "koi8-r")
+
+ self.cur.execute("create table test (a UNICODE)")
+ self.cur.execute("insert into test(a) values (%s)", test_str)
+ self.cur.execute("select a from test")
+ res = self.cur.fetchone()
+ self.failUnlessEqual(type(test_str), type(res.a),
+ "Something other than a Unicode string was fetched: %s"
+ % (str(type(res.a))))
+ self.failUnlessEqual(test_str, res.a,
+ "Fetching the unicode string doesn't return the inserted one.")
+
+class SQLiteBuiltinTypeSupport(unittest.TestCase, testsupport.TestSupport):
+ def setUp(self):
+ self.filename = self.getfilename()
+ self.cnx = sqlite.connect(self.filename, encoding="koi8-r")
+ self.cur = self.cnx.cursor()
+
+ def tearDown(self):
+ try:
+ self.cnx.close()
+ self.removefile()
+ except AttributeError:
+ pass
+ except sqlite.InterfaceError:
+ pass
+
+ def CheckInt(self):
+ self.cur.execute("create table test (a INTEGER)")
+ self.cur.execute("insert into test(a) values (%s)", 5)
+ self.cur.execute("select a from test")
+ res = self.cur.fetchone()
+ self.failUnlessEqual(type(5), type(res.a),
+ "Something other than an INTEGER was fetched: %s"
+ % (str(type(res.a))))
+
+ def CheckFloat(self):
+ self.cur.execute("create table test (a FLOAT)")
+ self.cur.execute("insert into test(a) values (%s)", 5.7)
+ self.cur.execute("select a from test")
+ res = self.cur.fetchone()
+ self.failUnlessEqual(type(5.7), type(res.a),
+ "Something other than a FLOAT was fetched: %s"
+ % (str(type(res.a))))
+
+ def CheckString(self):
+ self.cur.execute("create table test (a VARCHAR(20))")
+ self.cur.execute("insert into test(a) values (%s)", "foo")
+ self.cur.execute("select a from test")
+ res = self.cur.fetchone()
+ self.failUnlessEqual(type("foo"), type(res.a),
+ "Something other than a VARCHAR was fetched: %s"
+ % (str(type(res.a))))
+
+ def CheckBinary(self):
+ bindata = "".join([chr(x) for x in range(256)])
+ self.cur.execute("create table test(b BINARY)")
+ self.cur.execute("insert into test(b) values (%s)", sqlite.Binary(bindata))
+ self.cur.execute("select b from test")
+ res = self.cur.fetchone()
+ self.failUnlessEqual(bindata, res.b, "Binary roundtrip didn't produce original string")
+ self.failUnlessEqual(self.cur.description[0][1], sqlite.BINARY, "Wrong type code")
+
+ if have_datetime:
+ def CheckDate(self):
+ self.cur.execute("create table test (a DATE)")
+ d = DateFrom("2002-05-07")
+ self.cur.execute("insert into test(a) values (%s)", d)
+ self.cur.execute("select a from test")
+ res = self.cur.fetchone()
+ if res.a != d:
+ self.fail("didn't get back the same DATE")
+
+ def CheckTime(self):
+ self.cur.execute("create table test (a TIME)")
+ t = TimeFrom("22:15:00")
+ self.cur.execute("insert into test(a) values (%s)", t)
+ self.cur.execute("select a from test")
+ res = self.cur.fetchone()
+ if res.a != t:
+ self.fail("didn't get back the same TIME")
+
+ def CheckTimestamp(self):
+ self.cur.execute("create table test (a TIMESTAMP)")
+ d = DateFrom("2002-05-07 22:15:00")
+ self.cur.execute("insert into test(a) values (%s)", d)
+ self.cur.execute("select a from test")
+ res = self.cur.fetchone()
+ if res.a != d:
+ self.fail("didn't get back the same TIMESTAMP")
+
+ def CheckInterval(self):
+ self.cur.execute("create table test (a INTERVAL)")
+ d = DateTimeDeltaFrom("02:00:00")
+ self.cur.execute("insert into test(a) values (%s)", d)
+ self.cur.execute("select a from test")
+ res = self.cur.fetchone()
+ if res.a != d:
+ self.fail("didn't get back the same INTERVAL")
+
+def suite():
+ expected_suite = unittest.makeSuite(ExpectedTypes, "Check")
+ unicode_suite1 = unittest.makeSuite(UnicodeTestsLatin1, "Check")
+ unicode_suite2 = unittest.makeSuite(UnicodeTestsUtf8, "Check")
+ unicode_suite3 = unittest.makeSuite(UnicodeTestsKOI8R, "Check")
+ builtin_suite = unittest.makeSuite(SQLiteBuiltinTypeSupport, "Check")
+
+ return unittest.TestSuite((expected_suite, unicode_suite1, unicode_suite2,
+ unicode_suite3, builtin_suite))
+
+def main():
+ runner = unittest.TextTestRunner()
+ runner.run(suite())
+
+if __name__ == "__main__":
+ main()