diff options
author | Kali Kaneko <kali@futeisha.org> | 2013-02-04 19:20:12 +0900 |
---|---|---|
committer | Kali Kaneko <kali@futeisha.org> | 2013-02-04 19:20:12 +0900 |
commit | 4189e53a881e52de0945375a72b8748143c5bd13 (patch) | |
tree | 23ed8e0600dc3c62da7a95e50f778c79a9be1e75 /lib |
initial commit
Diffstat (limited to 'lib')
-rw-r--r-- | lib/__init__.py | 22 | ||||
-rw-r--r-- | lib/dbapi2.py | 92 | ||||
-rw-r--r-- | lib/dump.py | 63 | ||||
-rw-r--r-- | lib/test/__init__.py | 50 | ||||
-rw-r--r-- | lib/test/dbapi.py | 878 | ||||
-rw-r--r-- | lib/test/dump.py | 52 | ||||
-rw-r--r-- | lib/test/factory.py | 205 | ||||
-rw-r--r-- | lib/test/hooks.py | 188 | ||||
-rw-r--r-- | lib/test/py25/__init__.py | 1 | ||||
-rw-r--r-- | lib/test/py25/py25tests.py | 80 | ||||
-rw-r--r-- | lib/test/regression.py | 271 | ||||
-rw-r--r-- | lib/test/transactions.py | 205 | ||||
-rw-r--r-- | lib/test/types.py | 414 | ||||
-rw-r--r-- | lib/test/userfunctions.py | 413 |
14 files changed, 2934 insertions, 0 deletions
diff --git a/lib/__init__.py b/lib/__init__.py new file mode 100644 index 0000000..3920a2b --- /dev/null +++ b/lib/__init__.py @@ -0,0 +1,22 @@ +#-*- coding: ISO-8859-1 -*- +# pysqlcipher/__init__.py: the pysqlite2 package. +# +# Copyright (C) 2005-2007 Gerhard Häring <gh@ghaering.de> +# +# This file is part of pysqlite. +# +# This software is provided 'as-is', without any express or implied +# warranty. In no event will the authors be held liable for any damages +# arising from the use of this software. +# +# Permission is granted to anyone to use this software for any purpose, +# including commercial applications, and to alter it and redistribute it +# freely, subject to the following restrictions: +# +# 1. The origin of this software must not be misrepresented; you must not +# claim that you wrote the original software. If you use this software +# in a product, an acknowledgment in the product documentation would be +# appreciated but is not required. +# 2. Altered source versions must be plainly marked as such, and must not be +# misrepresented as being the original software. +# 3. This notice may not be removed or altered from any source distribution. diff --git a/lib/dbapi2.py b/lib/dbapi2.py new file mode 100644 index 0000000..78f340e --- /dev/null +++ b/lib/dbapi2.py @@ -0,0 +1,92 @@ +#-*- coding: ISO-8859-1 -*- +# pysqlcipher/dbapi2.py: the DB-API 2.0 interface with SQLCipher support +# +# Copyright (C) Kali Kaneko <kali@futeisha.org> +# Copyright (C) 2010 Gerhard Häring <gh@ghaering.de> +# +# This file is part of pysqlcipher. +# +# This software is provided 'as-is', without any express or implied +# warranty. In no event will the authors be held liable for any damages +# arising from the use of this software. +# +# Permission is granted to anyone to use this software for any purpose, +# including commercial applications, and to alter it and redistribute it +# freely, subject to the following restrictions: +# +# 1. The origin of this software must not be misrepresented; you must not +# claim that you wrote the original software. If you use this software +# in a product, an acknowledgment in the product documentation would be +# appreciated but is not required. +# 2. Altered source versions must be plainly marked as such, and must not be +# misrepresented as being the original software. +# 3. This notice may not be removed or altered from any source distribution. + +import datetime +import time + +try: + from pysqlcipher.libsqlite import * +except ImportError: + from pysqlcipher._sqlite import * + +paramstyle = "qmark" + +threadsafety = 1 + +apilevel = "2.0" + +Date = datetime.date + +Time = datetime.time + +Timestamp = datetime.datetime + +def DateFromTicks(ticks): + return Date(*time.localtime(ticks)[:3]) + +def TimeFromTicks(ticks): + return Time(*time.localtime(ticks)[3:6]) + +def TimestampFromTicks(ticks): + return Timestamp(*time.localtime(ticks)[:6]) + +version_info = tuple([int(x) for x in version.split(".")]) +sqlite_version_info = tuple([int(x) for x in sqlite_version.split(".")]) + +Binary = buffer + +def register_adapters_and_converters(): + def adapt_date(val): + return val.isoformat() + + def adapt_datetime(val): + return val.isoformat(" ") + + def convert_date(val): + return datetime.date(*map(int, val.split("-"))) + + def convert_timestamp(val): + datepart, timepart = val.split(" ") + year, month, day = map(int, datepart.split("-")) + timepart_full = timepart.split(".") + hours, minutes, seconds = map(int, timepart_full[0].split(":")) + if len(timepart_full) == 2: + microseconds = int(timepart_full[1]) + else: + microseconds = 0 + + val = datetime.datetime(year, month, day, hours, minutes, seconds, microseconds) + return val + + + register_adapter(datetime.date, adapt_date) + register_adapter(datetime.datetime, adapt_datetime) + register_converter("date", convert_date) + register_converter("timestamp", convert_timestamp) + +register_adapters_and_converters() + +# Clean up namespace + +del(register_adapters_and_converters) diff --git a/lib/dump.py b/lib/dump.py new file mode 100644 index 0000000..409a405 --- /dev/null +++ b/lib/dump.py @@ -0,0 +1,63 @@ +# Mimic the sqlite3 console shell's .dump command +# Author: Paul Kippes <kippesp@gmail.com> + +def _iterdump(connection): + """ + Returns an iterator to the dump of the database in an SQL text format. + + Used to produce an SQL dump of the database. Useful to save an in-memory + database for later restoration. This function should not be called + directly but instead called from the Connection method, iterdump(). + """ + + cu = connection.cursor() + yield('BEGIN TRANSACTION;') + + # sqlite_master table contains the SQL CREATE statements for the database. + q = """ + SELECT name, type, sql + FROM sqlite_master + WHERE sql NOT NULL AND + type == 'table' + """ + schema_res = cu.execute(q) + for table_name, type, sql in schema_res.fetchall(): + if table_name == 'sqlite_sequence': + yield('DELETE FROM sqlite_sequence;') + elif table_name == 'sqlite_stat1': + yield('ANALYZE sqlite_master;') + elif table_name.startswith('sqlite_'): + continue + # NOTE: Virtual table support not implemented + #elif sql.startswith('CREATE VIRTUAL TABLE'): + # qtable = table_name.replace("'", "''") + # yield("INSERT INTO sqlite_master(type,name,tbl_name,rootpage,sql)"\ + # "VALUES('table','%s','%s',0,'%s');" % + # qtable, + # qtable, + # sql.replace("''")) + else: + yield('%s;' % sql) + + # Build the insert statement for each row of the current table + res = cu.execute("PRAGMA table_info('%s')" % table_name) + column_names = [str(table_info[1]) for table_info in res.fetchall()] + q = "SELECT 'INSERT INTO \"%(tbl_name)s\" VALUES(" + q += ",".join(["'||quote(" + col + ")||'" for col in column_names]) + q += ")' FROM '%(tbl_name)s'" + query_res = cu.execute(q % {'tbl_name': table_name}) + for row in query_res: + yield("%s;" % row[0]) + + # Now when the type is 'index', 'trigger', or 'view' + q = """ + SELECT name, type, sql + FROM sqlite_master + WHERE sql NOT NULL AND + type IN ('index', 'trigger', 'view') + """ + schema_res = cu.execute(q) + for name, type, sql in schema_res.fetchall(): + yield('%s;' % sql) + + yield('COMMIT;') diff --git a/lib/test/__init__.py b/lib/test/__init__.py new file mode 100644 index 0000000..1c760bb --- /dev/null +++ b/lib/test/__init__.py @@ -0,0 +1,50 @@ +#-*- coding: ISO-8859-1 -*- +# pysqlite2/test/__init__.py: the package containing the test suite +# +# Copyright (C) 2004-2007 Gerhard Häring <gh@ghaering.de> +# +# This file is part of pysqlite. +# +# This software is provided 'as-is', without any express or implied +# warranty. In no event will the authors be held liable for any damages +# arising from the use of this software. +# +# Permission is granted to anyone to use this software for any purpose, +# including commercial applications, and to alter it and redistribute it +# freely, subject to the following restrictions: +# +# 1. The origin of this software must not be misrepresented; you must not +# claim that you wrote the original software. If you use this software +# in a product, an acknowledgment in the product documentation would be +# appreciated but is not required. +# 2. Altered source versions must be plainly marked as such, and must not be +# misrepresented as being the original software. +# 3. This notice may not be removed or altered from any source distribution. + +import os, sys +import unittest + +if os.path.exists("extended_setup.py"): + print "-" * 75 + print "You should not run the test suite from the pysqlite build directory." + print "This does not work well because the extension module cannot be found." + print "Just run the test suite from somewhere else, please!" + print "-" * 75 + sys.exit(1) + +from pysqlite2.test import dbapi, types, userfunctions, factory, transactions,\ + hooks, regression, dump +from pysqlcipher import dbapi2 as sqlite + +def suite(): + tests = [dbapi.suite(), types.suite(), userfunctions.suite(), + factory.suite(), transactions.suite(), hooks.suite(), regression.suite(), dump.suite()] + if sys.version_info >= (2, 5, 0): + from pysqlite2.test.py25 import py25tests + tests.append(py25tests.suite()) + + return unittest.TestSuite(tuple(tests)) + +def test(): + runner = unittest.TextTestRunner() + runner.run(suite()) diff --git a/lib/test/dbapi.py b/lib/test/dbapi.py new file mode 100644 index 0000000..05dfd4b --- /dev/null +++ b/lib/test/dbapi.py @@ -0,0 +1,878 @@ +#-*- coding: ISO-8859-1 -*- +# pysqlite2/test/dbapi.py: tests for DB-API compliance +# +# Copyright (C) 2004-2009 Gerhard Häring <gh@ghaering.de> +# +# This file is part of pysqlite. +# +# This software is provided 'as-is', without any express or implied +# warranty. In no event will the authors be held liable for any damages +# arising from the use of this software. +# +# Permission is granted to anyone to use this software for any purpose, +# including commercial applications, and to alter it and redistribute it +# freely, subject to the following restrictions: +# +# 1. The origin of this software must not be misrepresented; you must not +# claim that you wrote the original software. If you use this software +# in a product, an acknowledgment in the product documentation would be +# appreciated but is not required. +# 2. Altered source versions must be plainly marked as such, and must not be +# misrepresented as being the original software. +# 3. This notice may not be removed or altered from any source distribution. + +import unittest +import sys +import threading +import pysqlcipher.dbapi2 as sqlite + +class ModuleTests(unittest.TestCase): + def CheckAPILevel(self): + self.assertEqual(sqlite.apilevel, "2.0", + "apilevel is %s, should be 2.0" % sqlite.apilevel) + + def CheckThreadSafety(self): + self.assertEqual(sqlite.threadsafety, 1, + "threadsafety is %d, should be 1" % sqlite.threadsafety) + + def CheckParamStyle(self): + self.assertEqual(sqlite.paramstyle, "qmark", + "paramstyle is '%s', should be 'qmark'" % + sqlite.paramstyle) + + def CheckWarning(self): + self.assert_(issubclass(sqlite.Warning, StandardError), + "Warning is not a subclass of StandardError") + + def CheckError(self): + self.assertTrue(issubclass(sqlite.Error, StandardError), + "Error is not a subclass of StandardError") + + def CheckInterfaceError(self): + self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error), + "InterfaceError is not a subclass of Error") + + def CheckDatabaseError(self): + self.assertTrue(issubclass(sqlite.DatabaseError, sqlite.Error), + "DatabaseError is not a subclass of Error") + + def CheckDataError(self): + self.assertTrue(issubclass(sqlite.DataError, sqlite.DatabaseError), + "DataError is not a subclass of DatabaseError") + + def CheckOperationalError(self): + self.assertTrue(issubclass(sqlite.OperationalError, sqlite.DatabaseError), + "OperationalError is not a subclass of DatabaseError") + + def CheckIntegrityError(self): + self.assertTrue(issubclass(sqlite.IntegrityError, sqlite.DatabaseError), + "IntegrityError is not a subclass of DatabaseError") + + def CheckInternalError(self): + self.assertTrue(issubclass(sqlite.InternalError, sqlite.DatabaseError), + "InternalError is not a subclass of DatabaseError") + + def CheckProgrammingError(self): + self.assertTrue(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError), + "ProgrammingError is not a subclass of DatabaseError") + + def CheckNotSupportedError(self): + self.assertTrue(issubclass(sqlite.NotSupportedError, + sqlite.DatabaseError), + "NotSupportedError is not a subclass of DatabaseError") + +class ConnectionTests(unittest.TestCase): + def setUp(self): + self.cx = sqlite.connect(":memory:") + cu = self.cx.cursor() + cu.execute("create table test(id integer primary key, name text)") + cu.execute("insert into test(name) values (?)", ("foo",)) + + def tearDown(self): + self.cx.close() + + def CheckCommit(self): + self.cx.commit() + + def CheckCommitAfterNoChanges(self): + """ + A commit should also work when no changes were made to the database. + """ + self.cx.commit() + self.cx.commit() + + def CheckRollback(self): + self.cx.rollback() + + def CheckRollbackAfterNoChanges(self): + """ + A rollback should also work when no changes were made to the database. + """ + self.cx.rollback() + self.cx.rollback() + + def CheckCursor(self): + cu = self.cx.cursor() + + def CheckFailedOpen(self): + YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db" + try: + con = sqlite.connect(YOU_CANNOT_OPEN_THIS) + except sqlite.OperationalError: + return + self.fail("should have raised an OperationalError") + + def CheckClose(self): + self.cx.close() + + def CheckExceptions(self): + # Optional DB-API extension. + self.assertEqual(self.cx.Warning, sqlite.Warning) + self.assertEqual(self.cx.Error, sqlite.Error) + self.assertEqual(self.cx.InterfaceError, sqlite.InterfaceError) + self.assertEqual(self.cx.DatabaseError, sqlite.DatabaseError) + self.assertEqual(self.cx.DataError, sqlite.DataError) + self.assertEqual(self.cx.OperationalError, sqlite.OperationalError) + self.assertEqual(self.cx.IntegrityError, sqlite.IntegrityError) + self.assertEqual(self.cx.InternalError, sqlite.InternalError) + self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError) + self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError) + +class CursorTests(unittest.TestCase): + def setUp(self): + self.cx = sqlite.connect(":memory:") + self.cu = self.cx.cursor() + self.cu.execute("create table test(id integer primary key, name text, income number)") + self.cu.execute("insert into test(name) values (?)", ("foo",)) + + def tearDown(self): + self.cu.close() + self.cx.close() + + def CheckExecuteNoArgs(self): + self.cu.execute("delete from test") + + def CheckExecuteIllegalSql(self): + try: + self.cu.execute("select asdf") + self.fail("should have raised an OperationalError") + except sqlite.OperationalError: + return + except: + self.fail("raised wrong exception") + + def CheckExecuteTooMuchSql(self): + try: + self.cu.execute("select 5+4; select 4+5") + self.fail("should have raised a ProgrammingError") + except sqlite.ProgrammingError: + return + except: + self.fail("raised wrong exception") + + def CheckExecuteTooMuchSql2(self): + self.cu.execute("select 5+4; -- foo bar") + + def CheckExecuteTooMuchSql3(self): + self.cu.execute(""" + select 5+4; + + /* + foo + */ + """) + + def CheckExecuteWrongSqlArg(self): + try: + self.cu.execute(42) + self.fail("should have raised a ValueError") + except ValueError: + return + except: + self.fail("raised wrong exception.") + + def CheckExecuteArgInt(self): + self.cu.execute("insert into test(id) values (?)", (42,)) + + def CheckExecuteArgFloat(self): + self.cu.execute("insert into test(income) values (?)", (2500.32,)) + + def CheckExecuteArgString(self): + self.cu.execute("insert into test(name) values (?)", ("Hugo",)) + + def CheckExecuteWrongNoOfArgs1(self): + # too many parameters + try: + self.cu.execute("insert into test(id) values (?)", (17, "Egon")) + self.fail("should have raised ProgrammingError") + except sqlite.ProgrammingError: + pass + + def CheckExecuteWrongNoOfArgs2(self): + # too little parameters + try: + self.cu.execute("insert into test(id) values (?)") + self.fail("should have raised ProgrammingError") + except sqlite.ProgrammingError: + pass + + def CheckExecuteWrongNoOfArgs3(self): + # no parameters, parameters are needed + try: + self.cu.execute("insert into test(id) values (?)") + self.fail("should have raised ProgrammingError") + except sqlite.ProgrammingError: + pass + + def CheckExecuteParamList(self): + self.cu.execute("insert into test(name) values ('foo')") + self.cu.execute("select name from test where name=?", ["foo"]) + row = self.cu.fetchone() + self.assertEqual(row[0], "foo") + + def CheckExecuteParamSequence(self): + class L(object): + def __len__(self): + return 1 + def __getitem__(self, x): + assert x == 0 + return "foo" + + self.cu.execute("insert into test(name) values ('foo')") + self.cu.execute("select name from test where name=?", L()) + row = self.cu.fetchone() + self.assertEqual(row[0], "foo") + + def CheckExecuteDictMapping(self): + self.cu.execute("insert into test(name) values ('foo')") + self.cu.execute("select name from test where name=:name", {"name": "foo"}) + row = self.cu.fetchone() + self.assertEqual(row[0], "foo") + + def CheckExecuteDictMapping_Mapping(self): + # Test only works with Python 2.5 or later + if sys.version_info < (2, 5, 0): + return + + class D(dict): + def __missing__(self, key): + return "foo" + + self.cu.execute("insert into test(name) values ('foo')") + self.cu.execute("select name from test where name=:name", D()) + row = self.cu.fetchone() + self.assertEqual(row[0], "foo") + + def CheckExecuteDictMappingTooLittleArgs(self): + self.cu.execute("insert into test(name) values ('foo')") + try: + self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"}) + self.fail("should have raised ProgrammingError") + except sqlite.ProgrammingError: + pass + + def CheckExecuteDictMappingNoArgs(self): + self.cu.execute("insert into test(name) values ('foo')") + try: + self.cu.execute("select name from test where name=:name") + self.fail("should have raised ProgrammingError") + except sqlite.ProgrammingError: + pass + + def CheckExecuteDictMappingUnnamed(self): + self.cu.execute("insert into test(name) values ('foo')") + try: + self.cu.execute("select name from test where name=?", {"name": "foo"}) + self.fail("should have raised ProgrammingError") + except sqlite.ProgrammingError: + pass + + def CheckClose(self): + self.cu.close() + + def CheckRowcountExecute(self): + self.cu.execute("delete from test") + self.cu.execute("insert into test(name) values ('foo')") + self.cu.execute("insert into test(name) values ('foo')") + self.cu.execute("update test set name='bar'") + self.assertEqual(self.cu.rowcount, 2) + + def CheckRowcountSelect(self): + """ + pysqlite does not know the rowcount of SELECT statements, because we + don't fetch all rows after executing the select statement. The rowcount + has thus to be -1. + """ + self.cu.execute("select 5 union select 6") + self.assertEqual(self.cu.rowcount, -1) + + def CheckRowcountExecutemany(self): + self.cu.execute("delete from test") + self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)]) + self.assertEqual(self.cu.rowcount, 3) + + def CheckTotalChanges(self): + self.cu.execute("insert into test(name) values ('foo')") + self.cu.execute("insert into test(name) values ('foo')") + if self.cx.total_changes < 2: + self.fail("total changes reported wrong value") + + # Checks for executemany: + # Sequences are required by the DB-API, iterators + # enhancements in pysqlite. + + def CheckExecuteManySequence(self): + self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)]) + + def CheckExecuteManyIterator(self): + class MyIter: + def __init__(self): + self.value = 5 + + def next(self): + if self.value == 10: + raise StopIteration + else: + self.value += 1 + return (self.value,) + + self.cu.executemany("insert into test(income) values (?)", MyIter()) + + def CheckExecuteManyGenerator(self): + def mygen(): + for i in range(5): + yield (i,) + + self.cu.executemany("insert into test(income) values (?)", mygen()) + + def CheckExecuteManyWrongSqlArg(self): + try: + self.cu.executemany(42, [(3,)]) + self.fail("should have raised a ValueError") + except ValueError: + return + except: + self.fail("raised wrong exception.") + + def CheckExecuteManySelect(self): + try: + self.cu.executemany("select ?", [(3,)]) + self.fail("should have raised a ProgrammingError") + except sqlite.ProgrammingError: + return + except: + self.fail("raised wrong exception.") + + def CheckExecuteManyNotIterable(self): + try: + self.cu.executemany("insert into test(income) values (?)", 42) + self.fail("should have raised a TypeError") + except TypeError: + return + except Exception, e: + print "raised", e.__class__ + self.fail("raised wrong exception.") + + def CheckFetchIter(self): + # Optional DB-API extension. + self.cu.execute("delete from test") + self.cu.execute("insert into test(id) values (?)", (5,)) + self.cu.execute("insert into test(id) values (?)", (6,)) + self.cu.execute("select id from test order by id") + lst = [] + for row in self.cu: + lst.append(row[0]) + self.assertEqual(lst[0], 5) + self.assertEqual(lst[1], 6) + + def CheckFetchone(self): + self.cu.execute("select name from test") + row = self.cu.fetchone() + self.assertEqual(row[0], "foo") + row = self.cu.fetchone() + self.assertEqual(row, None) + + def CheckFetchoneNoStatement(self): + cur = self.cx.cursor() + row = cur.fetchone() + self.assertEqual(row, None) + + def CheckArraySize(self): + # must default ot 1 + self.assertEqual(self.cu.arraysize, 1) + + # now set to 2 + self.cu.arraysize = 2 + + # now make the query return 3 rows + self.cu.execute("delete from test") + self.cu.execute("insert into test(name) values ('A')") + self.cu.execute("insert into test(name) values ('B')") + self.cu.execute("insert into test(name) values ('C')") + self.cu.execute("select name from test") + res = self.cu.fetchmany() + + self.assertEqual(len(res), 2) + + def CheckFetchmany(self): + self.cu.execute("select name from test") + res = self.cu.fetchmany(100) + self.assertEqual(len(res), 1) + res = self.cu.fetchmany(100) + self.assertEqual(res, []) + + def CheckFetchmanyKwArg(self): + """Checks if fetchmany works with keyword arguments""" + self.cu.execute("select name from test") + res = self.cu.fetchmany(size=100) + self.assertEqual(len(res), 1) + + def CheckFetchall(self): + self.cu.execute("select name from test") + res = self.cu.fetchall() + self.assertEqual(len(res), 1) + res = self.cu.fetchall() + self.assertEqual(res, []) + + def CheckSetinputsizes(self): + self.cu.setinputsizes([3, 4, 5]) + + def CheckSetoutputsize(self): + self.cu.setoutputsize(5, 0) + + def CheckSetoutputsizeNoColumn(self): + self.cu.setoutputsize(42) + + def CheckCursorConnection(self): + # Optional DB-API extension. + self.assertEqual(self.cu.connection, self.cx) + + def CheckWrongCursorCallable(self): + try: + def f(): pass + cur = self.cx.cursor(f) + self.fail("should have raised a TypeError") + except TypeError: + return + self.fail("should have raised a ValueError") + + def CheckCursorWrongClass(self): + class Foo: pass + foo = Foo() + try: + cur = sqlite.Cursor(foo) + self.fail("should have raised a ValueError") + except TypeError: + pass + +class ThreadTests(unittest.TestCase): + def setUp(self): + self.con = sqlite.connect(":memory:") + self.cur = self.con.cursor() + self.cur.execute("create table test(id integer primary key, name text, bin binary, ratio number, ts timestamp)") + + def tearDown(self): + self.cur.close() + self.con.close() + + def CheckConCursor(self): + def run(con, errors): + try: + cur = con.cursor() + errors.append("did not raise ProgrammingError") + return + except sqlite.ProgrammingError: + return + except: + errors.append("raised wrong exception") + + errors = [] + t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) + t.start() + t.join() + if len(errors) > 0: + self.fail("\n".join(errors)) + + def CheckConCommit(self): + def run(con, errors): + try: + con.commit() + errors.append("did not raise ProgrammingError") + return + except sqlite.ProgrammingError: + return + except: + errors.append("raised wrong exception") + + errors = [] + t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) + t.start() + t.join() + if len(errors) > 0: + self.fail("\n".join(errors)) + + def CheckConRollback(self): + def run(con, errors): + try: + con.rollback() + errors.append("did not raise ProgrammingError") + return + except sqlite.ProgrammingError: + return + except: + errors.append("raised wrong exception") + + errors = [] + t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) + t.start() + t.join() + if len(errors) > 0: + self.fail("\n".join(errors)) + + def CheckConClose(self): + def run(con, errors): + try: + con.close() + errors.append("did not raise ProgrammingError") + return + except sqlite.ProgrammingError: + return + except: + errors.append("raised wrong exception") + + errors = [] + t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) + t.start() + t.join() + if len(errors) > 0: + self.fail("\n".join(errors)) + + def CheckCurImplicitBegin(self): + def run(cur, errors): + try: + cur.execute("insert into test(name) values ('a')") + errors.append("did not raise ProgrammingError") + return + except sqlite.ProgrammingError: + return + except: + errors.append("raised wrong exception") + + errors = [] + t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) + t.start() + t.join() + if len(errors) > 0: + self.fail("\n".join(errors)) + + def CheckCurClose(self): + def run(cur, errors): + try: + cur.close() + errors.append("did not raise ProgrammingError") + return + except sqlite.ProgrammingError: + return + except: + errors.append("raised wrong exception") + + errors = [] + t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) + t.start() + t.join() + if len(errors) > 0: + self.fail("\n".join(errors)) + + def CheckCurExecute(self): + def run(cur, errors): + try: + cur.execute("select name from test") + errors.append("did not raise ProgrammingError") + return + except sqlite.ProgrammingError: + return + except: + errors.append("raised wrong exception") + + errors = [] + self.cur.execute("insert into test(name) values ('a')") + t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) + t.start() + t.join() + if len(errors) > 0: + self.fail("\n".join(errors)) + + def CheckCurIterNext(self): + def run(cur, errors): + try: + row = cur.fetchone() + errors.append("did not raise ProgrammingError") + return + except sqlite.ProgrammingError: + return + except: + errors.append("raised wrong exception") + + errors = [] + self.cur.execute("insert into test(name) values ('a')") + self.cur.execute("select name from test") + t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) + t.start() + t.join() + if len(errors) > 0: + self.fail("\n".join(errors)) + +class ConstructorTests(unittest.TestCase): + def CheckDate(self): + d = sqlite.Date(2004, 10, 28) + + def CheckTime(self): + t = sqlite.Time(12, 39, 35) + + def CheckTimestamp(self): + ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35) + + def CheckDateFromTicks(self): + d = sqlite.DateFromTicks(42) + + def CheckTimeFromTicks(self): + t = sqlite.TimeFromTicks(42) + + def CheckTimestampFromTicks(self): + ts = sqlite.TimestampFromTicks(42) + + def CheckBinary(self): + b = sqlite.Binary(chr(0) + "'") + +class ExtensionTests(unittest.TestCase): + def CheckScriptStringSql(self): + con = sqlite.connect(":memory:") + cur = con.cursor() + cur.executescript(""" + -- bla bla + /* a stupid comment */ + create table a(i); + insert into a(i) values (5); + """) + cur.execute("select i from a") + res = cur.fetchone()[0] + self.assertEqual(res, 5) + + def CheckScriptStringUnicode(self): + con = sqlite.connect(":memory:") + cur = con.cursor() + cur.executescript(u""" + create table a(i); + insert into a(i) values (5); + select i from a; + delete from a; + insert into a(i) values (6); + """) + cur.execute("select i from a") + res = cur.fetchone()[0] + self.assertEqual(res, 6) + + def CheckScriptSyntaxError(self): + con = sqlite.connect(":memory:") + cur = con.cursor() + raised = False + try: + cur.executescript("create table test(x); asdf; create table test2(x)") + except sqlite.OperationalError: + raised = True + self.assertEqual(raised, True, "should have raised an exception") + + def CheckScriptErrorNormal(self): + con = sqlite.connect(":memory:") + cur = con.cursor() + raised = False + try: + cur.executescript("create table test(sadfsadfdsa); select foo from hurz;") + except sqlite.OperationalError: + raised = True + self.assertEqual(raised, True, "should have raised an exception") + + def CheckConnectionExecute(self): + con = sqlite.connect(":memory:") + result = con.execute("select 5").fetchone()[0] + self.assertEqual(result, 5, "Basic test of Connection.execute") + + def CheckConnectionExecutemany(self): + con = sqlite.connect(":memory:") + con.execute("create table test(foo)") + con.executemany("insert into test(foo) values (?)", [(3,), (4,)]) + result = con.execute("select foo from test order by foo").fetchall() + self.assertEqual(result[0][0], 3, "Basic test of Connection.executemany") + self.assertEqual(result[1][0], 4, "Basic test of Connection.executemany") + + def CheckConnectionExecutescript(self): + con = sqlite.connect(":memory:") + con.executescript("create table test(foo); insert into test(foo) values (5);") + result = con.execute("select foo from test").fetchone()[0] + self.assertEqual(result, 5, "Basic test of Connection.executescript") + +class ClosedConTests(unittest.TestCase): + def setUp(self): + pass + + def tearDown(self): + pass + + def CheckClosedConCursor(self): + con = sqlite.connect(":memory:") + con.close() + try: + cur = con.cursor() + self.fail("Should have raised a ProgrammingError") + except sqlite.ProgrammingError: + pass + except: + self.fail("Should have raised a ProgrammingError") + + def CheckClosedConCommit(self): + con = sqlite.connect(":memory:") + con.close() + try: + con.commit() + self.fail("Should have raised a ProgrammingError") + except sqlite.ProgrammingError: + pass + except: + self.fail("Should have raised a ProgrammingError") + + def CheckClosedConRollback(self): + con = sqlite.connect(":memory:") + con.close() + try: + con.rollback() + self.fail("Should have raised a ProgrammingError") + except sqlite.ProgrammingError: + pass + except: + self.fail("Should have raised a ProgrammingError") + + def CheckClosedCurExecute(self): + con = sqlite.connect(":memory:") + cur = con.cursor() + con.close() + try: + cur.execute("select 4") + self.fail("Should have raised a ProgrammingError") + except sqlite.ProgrammingError: + pass + except: + self.fail("Should have raised a ProgrammingError") + + def CheckClosedCreateFunction(self): + con = sqlite.connect(":memory:") + con.close() + def f(x): return 17 + try: + con.create_function("foo", 1, f) + self.fail("Should have raised a ProgrammingError") + except sqlite.ProgrammingError: + pass + except: + self.fail("Should have raised a ProgrammingError") + + def CheckClosedCreateAggregate(self): + con = sqlite.connect(":memory:") + con.close() + class Agg: + def __init__(self): + pass + def step(self, x): + pass + def finalize(self): + return 17 + try: + con.create_aggregate("foo", 1, Agg) + self.fail("Should have raised a ProgrammingError") + except sqlite.ProgrammingError: + pass + except: + self.fail("Should have raised a ProgrammingError") + + def CheckClosedSetAuthorizer(self): + con = sqlite.connect(":memory:") + con.close() + def authorizer(*args): + return sqlite.DENY + try: + con.set_authorizer(authorizer) + self.fail("Should have raised a ProgrammingError") + except sqlite.ProgrammingError: + pass + except: + self.fail("Should have raised a ProgrammingError") + + def CheckClosedSetProgressCallback(self): + con = sqlite.connect(":memory:") + con.close() + def progress(): pass + try: + con.set_progress_handler(progress, 100) + self.fail("Should have raised a ProgrammingError") + except sqlite.ProgrammingError: + pass + except: + self.fail("Should have raised a ProgrammingError") + + def CheckClosedCall(self): + con = sqlite.connect(":memory:") + con.close() + try: + con() + self.fail("Should have raised a ProgrammingError") + except sqlite.ProgrammingError: + pass + except: + self.fail("Should have raised a ProgrammingError") + +class ClosedCurTests(unittest.TestCase): + def setUp(self): + pass + + def tearDown(self): + pass + + def CheckClosed(self): + con = sqlite.connect(":memory:") + cur = con.cursor() + cur.close() + + for method_name in ("execute", "executemany", "executescript", "fetchall", "fetchmany", "fetchone"): + if method_name in ("execute", "executescript"): + params = ("select 4 union select 5",) + elif method_name == "executemany": + params = ("insert into foo(bar) values (?)", [(3,), (4,)]) + else: + params = [] + + try: + method = getattr(cur, method_name) + + method(*params) + self.fail("Should have raised a ProgrammingError: method " + method_name) + except sqlite.ProgrammingError: + pass + except: + self.fail("Should have raised a ProgrammingError: " + method_name) + +def suite(): + module_suite = unittest.makeSuite(ModuleTests, "Check") + connection_suite = unittest.makeSuite(ConnectionTests, "Check") + cursor_suite = unittest.makeSuite(CursorTests, "Check") + thread_suite = unittest.makeSuite(ThreadTests, "Check") + constructor_suite = unittest.makeSuite(ConstructorTests, "Check") + ext_suite = unittest.makeSuite(ExtensionTests, "Check") + closed_con_suite = unittest.makeSuite(ClosedConTests, "Check") + closed_cur_suite = unittest.makeSuite(ClosedCurTests, "Check") + return unittest.TestSuite((module_suite, connection_suite, cursor_suite, thread_suite, constructor_suite, ext_suite, closed_con_suite, closed_cur_suite)) + +def test(): + runner = unittest.TextTestRunner() + runner.run(suite()) + +if __name__ == "__main__": + test() diff --git a/lib/test/dump.py b/lib/test/dump.py new file mode 100644 index 0000000..b2f2d26 --- /dev/null +++ b/lib/test/dump.py @@ -0,0 +1,52 @@ +# Author: Paul Kippes <kippesp@gmail.com> + +import unittest +from pysqlcipher import dbapi2 as sqlite + +class DumpTests(unittest.TestCase): + def setUp(self): + self.cx = sqlite.connect(":memory:") + self.cu = self.cx.cursor() + + def tearDown(self): + self.cx.close() + + def CheckTableDump(self): + expected_sqls = [ + "CREATE TABLE t1(id integer primary key, s1 text, " \ + "t1_i1 integer not null, i2 integer, unique (s1), " \ + "constraint t1_idx1 unique (i2));" + , + "INSERT INTO \"t1\" VALUES(1,'foo',10,20);" + , + "INSERT INTO \"t1\" VALUES(2,'foo2',30,30);" + , + "CREATE TABLE t2(id integer, t2_i1 integer, " \ + "t2_i2 integer, primary key (id)," \ + "foreign key(t2_i1) references t1(t1_i1));" + , + "CREATE TRIGGER trigger_1 update of t1_i1 on t1 " \ + "begin " \ + "update t2 set t2_i1 = new.t1_i1 where t2_i1 = old.t1_i1; " \ + "end;" + , + "CREATE VIEW v1 as select * from t1 left join t2 " \ + "using (id);" + ] + [self.cu.execute(s) for s in expected_sqls] + i = self.cx.iterdump() + actual_sqls = [s for s in i] + expected_sqls = ['BEGIN TRANSACTION;'] + expected_sqls + \ + ['COMMIT;'] + [self.assertEqual(expected_sqls[i], actual_sqls[i]) + for i in xrange(len(expected_sqls))] + +def suite(): + return unittest.TestSuite(unittest.makeSuite(DumpTests, "Check")) + +def test(): + runner = unittest.TextTestRunner() + runner.run(suite()) + +if __name__ == "__main__": + test() diff --git a/lib/test/factory.py b/lib/test/factory.py new file mode 100644 index 0000000..a314d99 --- /dev/null +++ b/lib/test/factory.py @@ -0,0 +1,205 @@ +#-*- coding: ISO-8859-1 -*- +# pysqlite2/test/factory.py: tests for the various factories in pysqlite +# +# Copyright (C) 2005-2007 Gerhard Häring <gh@ghaering.de> +# +# This file is part of pysqlite. +# +# This software is provided 'as-is', without any express or implied +# warranty. In no event will the authors be held liable for any damages +# arising from the use of this software. +# +# Permission is granted to anyone to use this software for any purpose, +# including commercial applications, and to alter it and redistribute it +# freely, subject to the following restrictions: +# +# 1. The origin of this software must not be misrepresented; you must not +# claim that you wrote the original software. If you use this software +# in a product, an acknowledgment in the product documentation would be +# appreciated but is not required. +# 2. Altered source versions must be plainly marked as such, and must not be +# misrepresented as being the original software. +# 3. This notice may not be removed or altered from any source distribution. + +import unittest +import pysqlcipher.dbapi2 as sqlite + +class MyConnection(sqlite.Connection): + def __init__(self, *args, **kwargs): + sqlite.Connection.__init__(self, *args, **kwargs) + +def dict_factory(cursor, row): + d = {} + for idx, col in enumerate(cursor.description): + d[col[0]] = row[idx] + return d + +class MyCursor(sqlite.Cursor): + def __init__(self, *args, **kwargs): + sqlite.Cursor.__init__(self, *args, **kwargs) + self.row_factory = dict_factory + +class ConnectionFactoryTests(unittest.TestCase): + def setUp(self): + self.con = sqlite.connect(":memory:", factory=MyConnection) + + def tearDown(self): + self.con.close() + + def CheckIsInstance(self): + self.assertTrue(isinstance(self.con, + MyConnection), + "connection is not instance of MyConnection") + +class CursorFactoryTests(unittest.TestCase): + def setUp(self): + self.con = sqlite.connect(":memory:") + + def tearDown(self): + self.con.close() + + def CheckIsInstance(self): + cur = self.con.cursor(factory=MyCursor) + self.assertTrue(isinstance(cur, + MyCursor), + "cursor is not instance of MyCursor") + +class RowFactoryTestsBackwardsCompat(unittest.TestCase): + def setUp(self): + self.con = sqlite.connect(":memory:") + + def CheckIsProducedByFactory(self): + cur = self.con.cursor(factory=MyCursor) + cur.execute("select 4+5 as foo") + row = cur.fetchone() + self.assertTrue(isinstance(row, + dict), + "row is not instance of dict") + cur.close() + + def tearDown(self): + self.con.close() + +class RowFactoryTests(unittest.TestCase): + def setUp(self): + self.con = sqlite.connect(":memory:") + + def CheckCustomFactory(self): + self.con.row_factory = lambda cur, row: list(row) + row = self.con.execute("select 1, 2").fetchone() + self.assertTrue(isinstance(row, + list), + "row is not instance of list") + + def CheckSqliteRowIndex(self): + self.con.row_factory = sqlite.Row + row = self.con.execute("select 1 as a, 2 as b").fetchone() + self.assertTrue(isinstance(row, + sqlite.Row), + "row is not instance of sqlite.Row") + + col1, col2 = row["a"], row["b"] + self.assertTrue(col1 == 1, "by name: wrong result for column 'a'") + self.assertTrue(col2 == 2, "by name: wrong result for column 'a'") + + col1, col2 = row["A"], row["B"] + self.assertTrue(col1 == 1, "by name: wrong result for column 'A'") + self.assertTrue(col2 == 2, "by name: wrong result for column 'B'") + + col1, col2 = row[0], row[1] + self.assertTrue(col1 == 1, "by index: wrong result for column 0") + self.assertTrue(col2 == 2, "by index: wrong result for column 1") + + def CheckSqliteRowIter(self): + """Checks if the row object is iterable""" + self.con.row_factory = sqlite.Row + row = self.con.execute("select 1 as a, 2 as b").fetchone() + for col in row: + pass + + def CheckSqliteRowAsTuple(self): + """Checks if the row object can be converted to a tuple""" + self.con.row_factory = sqlite.Row + row = self.con.execute("select 1 as a, 2 as b").fetchone() + t = tuple(row) + + def CheckSqliteRowAsDict(self): + """Checks if the row object can be correctly converted to a dictionary""" + self.con.row_factory = sqlite.Row + row = self.con.execute("select 1 as a, 2 as b").fetchone() + d = dict(row) + self.assertEqual(d["a"], row["a"]) + self.assertEqual(d["b"], row["b"]) + + def CheckSqliteRowHashCmp(self): + """Checks if the row object compares and hashes correctly""" + self.con.row_factory = sqlite.Row + row_1 = self.con.execute("select 1 as a, 2 as b").fetchone() + row_2 = self.con.execute("select 1 as a, 2 as b").fetchone() + row_3 = self.con.execute("select 1 as a, 3 as b").fetchone() + + self.assertTrue(row_1 == row_1) + self.assertTrue(row_1 == row_2) + self.assertTrue(row_2 != row_3) + + self.assertFalse(row_1 != row_1) + self.assertFalse(row_1 != row_2) + self.assertFalse(row_2 == row_3) + + self.assertEqual(row_1, row_2) + self.assertEqual(hash(row_1), hash(row_2)) + self.assertNotEqual(row_1, row_3) + self.assertNotEqual(hash(row_1), hash(row_3)) + + def tearDown(self): + self.con.close() + +class TextFactoryTests(unittest.TestCase): + def setUp(self): + self.con = sqlite.connect(":memory:") + + def CheckUnicode(self): + austria = unicode("Österreich", "latin1") + row = self.con.execute("select ?", (austria,)).fetchone() + self.assertTrue(type(row[0]) == unicode, "type of row[0] must be unicode") + + def CheckString(self): + self.con.text_factory = str + austria = unicode("Österreich", "latin1") + row = self.con.execute("select ?", (austria,)).fetchone() + self.assertTrue(type(row[0]) == str, "type of row[0] must be str") + self.assertTrue(row[0] == austria.encode("utf-8"), "column must equal original data in UTF-8") + + def CheckCustom(self): + self.con.text_factory = lambda x: unicode(x, "utf-8", "ignore") + austria = unicode("Österreich", "latin1") + row = self.con.execute("select ?", (austria.encode("latin1"),)).fetchone() + self.assertTrue(type(row[0]) == unicode, "type of row[0] must be unicode") + self.assertTrue(row[0].endswith(u"reich"), "column must contain original data") + + def CheckOptimizedUnicode(self): + self.con.text_factory = sqlite.OptimizedUnicode + austria = unicode("Österreich", "latin1") + germany = unicode("Deutchland") + a_row = self.con.execute("select ?", (austria,)).fetchone() + d_row = self.con.execute("select ?", (germany,)).fetchone() + self.assertTrue(type(a_row[0]) == unicode, "type of non-ASCII row must be unicode") + self.assertTrue(type(d_row[0]) == str, "type of ASCII-only row must be str") + + def tearDown(self): + self.con.close() + +def suite(): + connection_suite = unittest.makeSuite(ConnectionFactoryTests, "Check") + cursor_suite = unittest.makeSuite(CursorFactoryTests, "Check") + row_suite_compat = unittest.makeSuite(RowFactoryTestsBackwardsCompat, "Check") + row_suite = unittest.makeSuite(RowFactoryTests, "Check") + text_suite = unittest.makeSuite(TextFactoryTests, "Check") + return unittest.TestSuite((connection_suite, cursor_suite, row_suite_compat, row_suite, text_suite)) + +def test(): + runner = unittest.TextTestRunner() + runner.run(suite()) + +if __name__ == "__main__": + test() diff --git a/lib/test/hooks.py b/lib/test/hooks.py new file mode 100644 index 0000000..5a38060 --- /dev/null +++ b/lib/test/hooks.py @@ -0,0 +1,188 @@ +#-*- coding: ISO-8859-1 -*- +# pysqlite2/test/hooks.py: tests for various SQLite-specific hooks +# +# Copyright (C) 2006-2007 Gerhard Häring <gh@ghaering.de> +# +# This file is part of pysqlite. +# +# This software is provided 'as-is', without any express or implied +# warranty. In no event will the authors be held liable for any damages +# arising from the use of this software. +# +# Permission is granted to anyone to use this software for any purpose, +# including commercial applications, and to alter it and redistribute it +# freely, subject to the following restrictions: +# +# 1. The origin of this software must not be misrepresented; you must not +# claim that you wrote the original software. If you use this software +# in a product, an acknowledgment in the product documentation would be +# appreciated but is not required. +# 2. Altered source versions must be plainly marked as such, and must not be +# misrepresented as being the original software. +# 3. This notice may not be removed or altered from any source distribution. + +import os, unittest +import pysqlcipher.dbapi2 as sqlite + +class CollationTests(unittest.TestCase): + def setUp(self): + pass + + def tearDown(self): + pass + + def CheckCreateCollationNotCallable(self): + con = sqlite.connect(":memory:") + try: + con.create_collation("X", 42) + self.fail("should have raised a TypeError") + except TypeError, e: + self.assertEqual(e.args[0], "parameter must be callable") + + def CheckCreateCollationNotAscii(self): + con = sqlite.connect(":memory:") + try: + con.create_collation("collä", cmp) + self.fail("should have raised a ProgrammingError") + except sqlite.ProgrammingError, e: + pass + + def CheckCollationIsUsed(self): + if sqlite.version_info < (3, 2, 1): # old SQLite versions crash on this test + return + def mycoll(x, y): + # reverse order + return -cmp(x, y) + + con = sqlite.connect(":memory:") + con.create_collation("mycoll", mycoll) + sql = """ + select x from ( + select 'a' as x + union + select 'b' as x + union + select 'c' as x + ) order by x collate mycoll + """ + result = con.execute(sql).fetchall() + if result[0][0] != "c" or result[1][0] != "b" or result[2][0] != "a": + self.fail("the expected order was not returned") + + con.create_collation("mycoll", None) + try: + result = con.execute(sql).fetchall() + self.fail("should have raised an OperationalError") + except sqlite.OperationalError, e: + self.assertEqual(e.args[0].lower(), "no such collation sequence: mycoll") + + def CheckCollationRegisterTwice(self): + """ + Register two different collation functions under the same name. + Verify that the last one is actually used. + """ + con = sqlite.connect(":memory:") + con.create_collation("mycoll", cmp) + con.create_collation("mycoll", lambda x, y: -cmp(x, y)) + result = con.execute(""" + select x from (select 'a' as x union select 'b' as x) order by x collate mycoll + """).fetchall() + if result[0][0] != 'b' or result[1][0] != 'a': + self.fail("wrong collation function is used") + + def CheckDeregisterCollation(self): + """ + Register a collation, then deregister it. Make sure an error is raised if we try + to use it. + """ + con = sqlite.connect(":memory:") + con.create_collation("mycoll", cmp) + con.create_collation("mycoll", None) + try: + con.execute("select 'a' as x union select 'b' as x order by x collate mycoll") + self.fail("should have raised an OperationalError") + except sqlite.OperationalError, e: + if not e.args[0].startswith("no such collation sequence"): + self.fail("wrong OperationalError raised") + +class ProgressTests(unittest.TestCase): + def CheckProgressHandlerUsed(self): + """ + Test that the progress handler is invoked once it is set. + """ + con = sqlite.connect(":memory:") + progress_calls = [] + def progress(): + progress_calls.append(None) + return 0 + con.set_progress_handler(progress, 1) + con.execute(""" + create table foo(a, b) + """) + self.assertTrue(progress_calls) + + + def CheckOpcodeCount(self): + """ + Test that the opcode argument is respected. + """ + con = sqlite.connect(":memory:") + progress_calls = [] + def progress(): + progress_calls.append(None) + return 0 + con.set_progress_handler(progress, 1) + curs = con.cursor() + curs.execute(""" + create table foo (a, b) + """) + first_count = len(progress_calls) + progress_calls = [] + con.set_progress_handler(progress, 2) + curs.execute(""" + create table bar (a, b) + """) + second_count = len(progress_calls) + self.assertTrue(first_count > second_count) + + def CheckCancelOperation(self): + """ + Test that returning a non-zero value stops the operation in progress. + """ + con = sqlite.connect(":memory:") + progress_calls = [] + def progress(): + progress_calls.append(None) + return 1 + con.set_progress_handler(progress, 1) + curs = con.cursor() + self.assertRaises( + sqlite.OperationalError, + curs.execute, + "create table bar (a, b)") + + def CheckClearHandler(self): + """ + Test that setting the progress handler to None clears the previously set handler. + """ + con = sqlite.connect(":memory:") + action = 0 + def progress(): + action = 1 + return 0 + con.set_progress_handler(progress, 1) + con.set_progress_handler(None, 1) + con.execute("select 1 union select 2 union select 3").fetchall() + self.assertEqual(action, 0, "progress handler was not cleared") + +def suite(): + collation_suite = unittest.makeSuite(CollationTests, "Check") + progress_suite = unittest.makeSuite(ProgressTests, "Check") + return unittest.TestSuite((collation_suite, progress_suite)) + +def test(): + runner = unittest.TextTestRunner() + runner.run(suite()) + +if __name__ == "__main__": + test() diff --git a/lib/test/py25/__init__.py b/lib/test/py25/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/lib/test/py25/__init__.py @@ -0,0 +1 @@ + diff --git a/lib/test/py25/py25tests.py b/lib/test/py25/py25tests.py new file mode 100644 index 0000000..ede1279 --- /dev/null +++ b/lib/test/py25/py25tests.py @@ -0,0 +1,80 @@ +#-*- coding: ISO-8859-1 -*- +# pysqlite2/test/regression.py: pysqlite regression tests +# +# Copyright (C) 2007 Gerhard Häring <gh@ghaering.de> +# +# This file is part of pysqlite. +# +# This software is provided 'as-is', without any express or implied +# warranty. In no event will the authors be held liable for any damages +# arising from the use of this software. +# +# Permission is granted to anyone to use this software for any purpose, +# including commercial applications, and to alter it and redistribute it +# freely, subject to the following restrictions: +# +# 1. The origin of this software must not be misrepresented; you must not +# claim that you wrote the original software. If you use this software +# in a product, an acknowledgment in the product documentation would be +# appreciated but is not required. +# 2. Altered source versions must be plainly marked as such, and must not be +# misrepresented as being the original software. +# 3. This notice may not be removed or altered from any source distribution. + +from __future__ import with_statement +import unittest +import pysqlcipher.dbapi2 as sqlite + +did_rollback = False + +class MyConnection(sqlite.Connection): + def rollback(self): + global did_rollback + did_rollback = True + sqlite.Connection.rollback(self) + +class ContextTests(unittest.TestCase): + def setUp(self): + global did_rollback + self.con = sqlite.connect(":memory:", factory=MyConnection) + self.con.execute("create table test(c unique)") + did_rollback = False + + def tearDown(self): + self.con.close() + + def CheckContextManager(self): + """Can the connection be used as a context manager at all?""" + with self.con: + pass + + def CheckContextManagerCommit(self): + """Is a commit called in the context manager?""" + with self.con: + self.con.execute("insert into test(c) values ('foo')") + self.con.rollback() + count = self.con.execute("select count(*) from test").fetchone()[0] + self.assertEqual(count, 1) + + def CheckContextManagerRollback(self): + """Is a rollback called in the context manager?""" + global did_rollback + self.assertEqual(did_rollback, False) + try: + with self.con: + self.con.execute("insert into test(c) values (4)") + self.con.execute("insert into test(c) values (4)") + except sqlite.IntegrityError: + pass + self.assertEqual(did_rollback, True) + +def suite(): + ctx_suite = unittest.makeSuite(ContextTests, "Check") + return unittest.TestSuite((ctx_suite,)) + +def test(): + runner = unittest.TextTestRunner() + runner.run(suite()) + +if __name__ == "__main__": + test() diff --git a/lib/test/regression.py b/lib/test/regression.py new file mode 100644 index 0000000..4003742 --- /dev/null +++ b/lib/test/regression.py @@ -0,0 +1,271 @@ +#-*- coding: ISO-8859-1 -*- +# pysqlite2/test/regression.py: pysqlite regression tests +# +# Copyright (C) 2006-2009 Gerhard Häring <gh@ghaering.de> +# +# This file is part of pysqlite. +# +# This software is provided 'as-is', without any express or implied +# warranty. In no event will the authors be held liable for any damages +# arising from the use of this software. +# +# Permission is granted to anyone to use this software for any purpose, +# including commercial applications, and to alter it and redistribute it +# freely, subject to the following restrictions: +# +# 1. The origin of this software must not be misrepresented; you must not +# claim that you wrote the original software. If you use this software +# in a product, an acknowledgment in the product documentation would be +# appreciated but is not required. +# 2. Altered source versions must be plainly marked as such, and must not be +# misrepresented as being the original software. +# 3. This notice may not be removed or altered from any source distribution. + +import datetime +import unittest +import pysqlcipher.dbapi2 as sqlite + +class RegressionTests(unittest.TestCase): + def setUp(self): + self.con = sqlite.connect(":memory:") + + def tearDown(self): + self.con.close() + + def CheckPragmaUserVersion(self): + # This used to crash pysqlite because this pragma command returns NULL for the column name + cur = self.con.cursor() + cur.execute("pragma user_version") + + def CheckPragmaSchemaVersion(self): + # This still crashed pysqlite <= 2.2.1 + con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES) + try: + cur = self.con.cursor() + cur.execute("pragma schema_version") + finally: + cur.close() + con.close() + + def CheckStatementReset(self): + # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are + # reset before a rollback, but only those that are still in the + # statement cache. The others are not accessible from the connection object. + con = sqlite.connect(":memory:", cached_statements=5) + cursors = [con.cursor() for x in xrange(5)] + cursors[0].execute("create table test(x)") + for i in range(10): + cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in xrange(10)]) + + for i in range(5): + cursors[i].execute(" " * i + "select x from test") + + con.rollback() + + def CheckColumnNameWithSpaces(self): + cur = self.con.cursor() + cur.execute('select 1 as "foo bar [datetime]"') + self.assertEqual(cur.description[0][0], "foo bar") + + cur.execute('select 1 as "foo baz"') + self.assertEqual(cur.description[0][0], "foo baz") + + def CheckStatementFinalizationOnCloseDb(self): + # pysqlite versions <= 2.3.3 only finalized statements in the statement + # cache when closing the database. statements that were still + # referenced in cursors weren't closed an could provoke " + # "OperationalError: Unable to close due to unfinalised statements". + con = sqlite.connect(":memory:") + cursors = [] + # default statement cache size is 100 + for i in range(105): + cur = con.cursor() + cursors.append(cur) + cur.execute("select 1 x union select " + str(i)) + con.close() + + def CheckOnConflictRollback(self): + if sqlite.sqlite_version_info < (3, 2, 2): + return + con = sqlite.connect(":memory:") + con.execute("create table foo(x, unique(x) on conflict rollback)") + con.execute("insert into foo(x) values (1)") + try: + con.execute("insert into foo(x) values (1)") + except sqlite.DatabaseError: + pass + con.execute("insert into foo(x) values (2)") + try: + con.commit() + except sqlite.OperationalError: + self.fail("pysqlite knew nothing about the implicit ROLLBACK") + + def CheckWorkaroundForBuggySqliteTransferBindings(self): + """ + pysqlite would crash with older SQLite versions unless + a workaround is implemented. + """ + self.con.execute("create table foo(bar)") + self.con.execute("drop table foo") + self.con.execute("create table foo(bar)") + + def CheckEmptyStatement(self): + """ + pysqlite used to segfault with SQLite versions 3.5.x. These return NULL + for "no-operation" statements + """ + self.con.execute("") + + def CheckUnicodeConnect(self): + """ + With pysqlite 2.4.0 you needed to use a string or a APSW connection + object for opening database connections. + + Formerly, both bytestrings and unicode strings used to work. + + Let's make sure unicode strings work in the future. + """ + con = sqlite.connect(u":memory:") + con.close() + + def CheckTypeMapUsage(self): + """ + pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling + a statement. This test exhibits the problem. + """ + SELECT = "select * from foo" + con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES) + con.execute("create table foo(bar timestamp)") + con.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),)) + con.execute(SELECT) + con.execute("drop table foo") + con.execute("create table foo(bar integer)") + con.execute("insert into foo(bar) values (5)") + con.execute(SELECT) + + def CheckRegisterAdapter(self): + """ + See issue 3312. + """ + self.assertRaises(TypeError, sqlite.register_adapter, {}, None) + + def CheckSetIsolationLevel(self): + """ + See issue 3312. + """ + con = sqlite.connect(":memory:") + self.assertRaises(UnicodeEncodeError, setattr, con, + "isolation_level", u"\xe9") + + def CheckCursorConstructorCallCheck(self): + """ + Verifies that cursor methods check wether base class __init__ was called. + """ + class Cursor(sqlite.Cursor): + def __init__(self, con): + pass + + con = sqlite.connect(":memory:") + cur = Cursor(con) + try: + cur.execute("select 4+5").fetchall() + self.fail("should have raised ProgrammingError") + except sqlite.ProgrammingError: + pass + except: + self.fail("should have raised ProgrammingError") + + def CheckConnectionConstructorCallCheck(self): + """ + Verifies that connection methods check wether base class __init__ was called. + """ + class Connection(sqlite.Connection): + def __init__(self, name): + pass + + con = Connection(":memory:") + try: + cur = con.cursor() + self.fail("should have raised ProgrammingError") + except sqlite.ProgrammingError: + pass + except: + self.fail("should have raised ProgrammingError") + + def CheckCursorRegistration(self): + """ + Verifies that subclassed cursor classes are correctly registered with + the connection object, too. (fetch-across-rollback problem) + """ + class Connection(sqlite.Connection): + def cursor(self): + return Cursor(self) + + class Cursor(sqlite.Cursor): + def __init__(self, con): + sqlite.Cursor.__init__(self, con) + + con = Connection(":memory:") + cur = con.cursor() + cur.execute("create table foo(x)") + cur.executemany("insert into foo(x) values (?)", [(3,), (4,), (5,)]) + cur.execute("select x from foo") + con.rollback() + try: + cur.fetchall() + self.fail("should have raised InterfaceError") + except sqlite.InterfaceError: + pass + except: + self.fail("should have raised InterfaceError") + + def CheckAutoCommit(self): + """ + Verifies that creating a connection in autocommit mode works. + 2.5.3 introduced a regression so that these could no longer + be created. + """ + con = sqlite.connect(":memory:", isolation_level=None) + + def CheckPragmaAutocommit(self): + """ + Verifies that running a PRAGMA statement that does an autocommit does + work. This did not work in 2.5.3/2.5.4. + """ + con = sqlite.connect(":memory:") + cur = con.cursor() + cur.execute("create table foo(bar)") + cur.execute("insert into foo(bar) values (5)") + + cur.execute("pragma page_size") + row = cur.fetchone() + + def CheckSetDict(self): + """ + See http://bugs.python.org/issue7478 + + It was possible to successfully register callbacks that could not be + hashed. Return codes of PyDict_SetItem were not checked properly. + """ + class NotHashable: + def __call__(self, *args, **kw): + pass + def __hash__(self): + raise TypeError() + var = NotHashable() + con = sqlite.connect(":memory:") + self.assertRaises(TypeError, con.create_function, var) + self.assertRaises(TypeError, con.create_aggregate, var) + self.assertRaises(TypeError, con.set_authorizer, var) + self.assertRaises(TypeError, con.set_progress_handler, var) + +def suite(): + regression_suite = unittest.makeSuite(RegressionTests, "Check") + return unittest.TestSuite((regression_suite,)) + +def test(): + runner = unittest.TextTestRunner() + runner.run(suite()) + +if __name__ == "__main__": + test() diff --git a/lib/test/transactions.py b/lib/test/transactions.py new file mode 100644 index 0000000..4e3aa08 --- /dev/null +++ b/lib/test/transactions.py @@ -0,0 +1,205 @@ +#-*- coding: ISO-8859-1 -*- +# pysqlite2/test/transactions.py: tests transactions +# +# Copyright (C) 2005-2009 Gerhard Häring <gh@ghaering.de> +# +# This file is part of pysqlite. +# +# This software is provided 'as-is', without any express or implied +# warranty. In no event will the authors be held liable for any damages +# arising from the use of this software. +# +# Permission is granted to anyone to use this software for any purpose, +# including commercial applications, and to alter it and redistribute it +# freely, subject to the following restrictions: +# +# 1. The origin of this software must not be misrepresented; you must not +# claim that you wrote the original software. If you use this software +# in a product, an acknowledgment in the product documentation would be +# appreciated but is not required. +# 2. Altered source versions must be plainly marked as such, and must not be +# misrepresented as being the original software. +# 3. This notice may not be removed or altered from any source distribution. + +import sys +import os, unittest +import pysqlcipher.dbapi2 as sqlite + +def get_db_path(): + return "sqlite_testdb" + +class TransactionTests(unittest.TestCase): + def setUp(self): + try: + os.remove(get_db_path()) + except OSError: + pass + + self.con1 = sqlite.connect(get_db_path(), timeout=0.1) + self.cur1 = self.con1.cursor() + + self.con2 = sqlite.connect(get_db_path(), timeout=0.1) + self.cur2 = self.con2.cursor() + + def tearDown(self): + self.cur1.close() + self.con1.close() + + self.cur2.close() + self.con2.close() + + try: + os.unlink(get_db_path()) + except OSError: + pass + + def CheckDMLdoesAutoCommitBefore(self): + self.cur1.execute("create table test(i)") + self.cur1.execute("insert into test(i) values (5)") + self.cur1.execute("create table test2(j)") + self.cur2.execute("select i from test") + res = self.cur2.fetchall() + self.assertEqual(len(res), 1) + + def CheckInsertStartsTransaction(self): + self.cur1.execute("create table test(i)") + self.cur1.execute("insert into test(i) values (5)") + self.cur2.execute("select i from test") + res = self.cur2.fetchall() + self.assertEqual(len(res), 0) + + def CheckUpdateStartsTransaction(self): + self.cur1.execute("create table test(i)") + self.cur1.execute("insert into test(i) values (5)") + self.con1.commit() + self.cur1.execute("update test set i=6") + self.cur2.execute("select i from test") + res = self.cur2.fetchone()[0] + self.assertEqual(res, 5) + + def CheckDeleteStartsTransaction(self): + self.cur1.execute("create table test(i)") + self.cur1.execute("insert into test(i) values (5)") + self.con1.commit() + self.cur1.execute("delete from test") + self.cur2.execute("select i from test") + res = self.cur2.fetchall() + self.assertEqual(len(res), 1) + + def CheckReplaceStartsTransaction(self): + self.cur1.execute("create table test(i)") + self.cur1.execute("insert into test(i) values (5)") + self.con1.commit() + self.cur1.execute("replace into test(i) values (6)") + self.cur2.execute("select i from test") + res = self.cur2.fetchall() + self.assertEqual(len(res), 1) + self.assertEqual(res[0][0], 5) + + def CheckToggleAutoCommit(self): + self.cur1.execute("create table test(i)") + self.cur1.execute("insert into test(i) values (5)") + self.con1.isolation_level = None + self.assertEqual(self.con1.isolation_level, None) + self.cur2.execute("select i from test") + res = self.cur2.fetchall() + self.assertEqual(len(res), 1) + + self.con1.isolation_level = "DEFERRED" + self.assertEqual(self.con1.isolation_level , "DEFERRED") + self.cur1.execute("insert into test(i) values (5)") + self.cur2.execute("select i from test") + res = self.cur2.fetchall() + self.assertEqual(len(res), 1) + + def CheckRaiseTimeout(self): + if sqlite.sqlite_version_info < (3, 2, 2): + # This will fail (hang) on earlier versions of sqlite. + # Determine exact version it was fixed. 3.2.1 hangs. + return + self.cur1.execute("create table test(i)") + self.cur1.execute("insert into test(i) values (5)") + try: + self.cur2.execute("insert into test(i) values (5)") + self.fail("should have raised an OperationalError") + except sqlite.OperationalError: + pass + except: + self.fail("should have raised an OperationalError") + + def CheckLocking(self): + """ + This tests the improved concurrency with pysqlite 2.3.4. You needed + to roll back con2 before you could commit con1. + """ + if sqlite.sqlite_version_info < (3, 2, 2): + # This will fail (hang) on earlier versions of sqlite. + # Determine exact version it was fixed. 3.2.1 hangs. + return + self.cur1.execute("create table test(i)") + self.cur1.execute("insert into test(i) values (5)") + try: + self.cur2.execute("insert into test(i) values (5)") + self.fail("should have raised an OperationalError") + except sqlite.OperationalError: + pass + except: + self.fail("should have raised an OperationalError") + # NO self.con2.rollback() HERE!!! + self.con1.commit() + + def CheckRollbackCursorConsistency(self): + """ + Checks if cursors on the connection are set into a "reset" state + when a rollback is done on the connection. + """ + con = sqlite.connect(":memory:") + cur = con.cursor() + cur.execute("create table test(x)") + cur.execute("insert into test(x) values (5)") + cur.execute("select 1 union select 2 union select 3") + + con.rollback() + try: + cur.fetchall() + self.fail("InterfaceError should have been raised") + except sqlite.InterfaceError, e: + pass + except: + self.fail("InterfaceError should have been raised") + +class SpecialCommandTests(unittest.TestCase): + def setUp(self): + self.con = sqlite.connect(":memory:") + self.cur = self.con.cursor() + + def CheckVacuum(self): + self.cur.execute("create table test(i)") + self.cur.execute("insert into test(i) values (5)") + self.cur.execute("vacuum") + + def CheckDropTable(self): + self.cur.execute("create table test(i)") + self.cur.execute("insert into test(i) values (5)") + self.cur.execute("drop table test") + + def CheckPragma(self): + self.cur.execute("create table test(i)") + self.cur.execute("insert into test(i) values (5)") + self.cur.execute("pragma count_changes=1") + + def tearDown(self): + self.cur.close() + self.con.close() + +def suite(): + default_suite = unittest.makeSuite(TransactionTests, "Check") + special_command_suite = unittest.makeSuite(SpecialCommandTests, "Check") + return unittest.TestSuite((default_suite, special_command_suite)) + +def test(): + runner = unittest.TextTestRunner() + runner.run(suite()) + +if __name__ == "__main__": + test() diff --git a/lib/test/types.py b/lib/test/types.py new file mode 100644 index 0000000..fc757be --- /dev/null +++ b/lib/test/types.py @@ -0,0 +1,414 @@ +#-*- coding: ISO-8859-1 -*- +# pysqlite2/test/types.py: tests for type conversion and detection +# +# Copyright (C) 2005-2007 Gerhard Häring <gh@ghaering.de> +# +# This file is part of pysqlite. +# +# This software is provided 'as-is', without any express or implied +# warranty. In no event will the authors be held liable for any damages +# arising from the use of this software. +# +# Permission is granted to anyone to use this software for any purpose, +# including commercial applications, and to alter it and redistribute it +# freely, subject to the following restrictions: +# +# 1. The origin of this software must not be misrepresented; you must not +# claim that you wrote the original software. If you use this software +# in a product, an acknowledgment in the product documentation would be +# appreciated but is not required. +# 2. Altered source versions must be plainly marked as such, and must not be +# misrepresented as being the original software. +# 3. This notice may not be removed or altered from any source distribution. + +import datetime +import unittest +import pysqlcipher.dbapi2 as sqlite +import zlib + + +class SqliteTypeTests(unittest.TestCase): + def setUp(self): + self.con = sqlite.connect(":memory:") + self.cur = self.con.cursor() + self.cur.execute("create table test(i integer, s varchar, f number, b blob)") + + def tearDown(self): + self.cur.close() + self.con.close() + + def CheckString(self): + self.cur.execute("insert into test(s) values (?)", (u"Österreich",)) + self.cur.execute("select s from test") + row = self.cur.fetchone() + self.assertEqual(row[0], u"Österreich") + + def CheckSmallInt(self): + self.cur.execute("insert into test(i) values (?)", (42,)) + self.cur.execute("select i from test") + row = self.cur.fetchone() + self.assertEqual(row[0], 42) + + def CheckLargeInt(self): + num = 2**40 + self.cur.execute("insert into test(i) values (?)", (num,)) + self.cur.execute("select i from test") + row = self.cur.fetchone() + self.assertEqual(row[0], num) + + def CheckFloat(self): + val = 3.14 + self.cur.execute("insert into test(f) values (?)", (val,)) + self.cur.execute("select f from test") + row = self.cur.fetchone() + self.assertEqual(row[0], val) + + def CheckBlob(self): + val = buffer("Guglhupf") + self.cur.execute("insert into test(b) values (?)", (val,)) + self.cur.execute("select b from test") + row = self.cur.fetchone() + self.assertEqual(row[0], val) + + def CheckUnicodeExecute(self): + self.cur.execute(u"select 'Österreich'") + row = self.cur.fetchone() + self.assertEqual(row[0], u"Österreich") + + def CheckNonUtf8_Default(self): + try: + self.cur.execute("select ?", (chr(150),)) + self.fail("should have raised a ProgrammingError") + except sqlite.ProgrammingError: + pass + + def CheckNonUtf8_TextFactoryString(self): + orig_text_factory = self.con.text_factory + try: + self.con.text_factory = str + self.cur.execute("select ?", (chr(150),)) + finally: + self.con.text_factory = orig_text_factory + + def CheckNonUtf8_TextFactoryOptimizedUnicode(self): + orig_text_factory = self.con.text_factory + try: + try: + self.con.text_factory = sqlite.OptimizedUnicode + self.cur.execute("select ?", (chr(150),)) + self.fail("should have raised a ProgrammingError") + except sqlite.ProgrammingError: + pass + finally: + self.con.text_factory = orig_text_factory + +class DeclTypesTests(unittest.TestCase): + class Foo: + def __init__(self, _val): + self.val = _val + + def __cmp__(self, other): + if not isinstance(other, DeclTypesTests.Foo): + raise ValueError + if self.val == other.val: + return 0 + else: + return 1 + + def __conform__(self, protocol): + if protocol is sqlite.PrepareProtocol: + return self.val + else: + return None + + def __str__(self): + return "<%s>" % self.val + + def setUp(self): + self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES) + self.cur = self.con.cursor() + self.cur.execute("create table test(i int, s str, f float, b bool, u unicode, foo foo, bin blob, n1 number, n2 number(5))") + + # override float, make them always return the same number + sqlite.converters["FLOAT"] = lambda x: 47.2 + + # and implement two custom ones + sqlite.converters["BOOL"] = lambda x: bool(int(x)) + sqlite.converters["FOO"] = DeclTypesTests.Foo + sqlite.converters["WRONG"] = lambda x: "WRONG" + sqlite.converters["NUMBER"] = float + + def tearDown(self): + del sqlite.converters["FLOAT"] + del sqlite.converters["BOOL"] + del sqlite.converters["FOO"] + del sqlite.converters["NUMBER"] + self.cur.close() + self.con.close() + + def CheckString(self): + # default + self.cur.execute("insert into test(s) values (?)", ("foo",)) + self.cur.execute('select s as "s [WRONG]" from test') + row = self.cur.fetchone() + self.assertEqual(row[0], "foo") + + def CheckSmallInt(self): + # default + self.cur.execute("insert into test(i) values (?)", (42,)) + self.cur.execute("select i from test") + row = self.cur.fetchone() + self.assertEqual(row[0], 42) + + def CheckLargeInt(self): + # default + num = 2**40 + self.cur.execute("insert into test(i) values (?)", (num,)) + self.cur.execute("select i from test") + row = self.cur.fetchone() + self.assertEqual(row[0], num) + + def CheckFloat(self): + # custom + val = 3.14 + self.cur.execute("insert into test(f) values (?)", (val,)) + self.cur.execute("select f from test") + row = self.cur.fetchone() + self.assertEqual(row[0], 47.2) + + def CheckBool(self): + # custom + self.cur.execute("insert into test(b) values (?)", (False,)) + self.cur.execute("select b from test") + row = self.cur.fetchone() + self.assertEqual(row[0], False) + + self.cur.execute("delete from test") + self.cur.execute("insert into test(b) values (?)", (True,)) + self.cur.execute("select b from test") + row = self.cur.fetchone() + self.assertEqual(row[0], True) + + def CheckUnicode(self): + # default + val = u"\xd6sterreich" + self.cur.execute("insert into test(u) values (?)", (val,)) + self.cur.execute("select u from test") + row = self.cur.fetchone() + self.assertEqual(row[0], val) + + def CheckFoo(self): + val = DeclTypesTests.Foo("bla") + self.cur.execute("insert into test(foo) values (?)", (val,)) + self.cur.execute("select foo from test") + row = self.cur.fetchone() + self.assertEqual(row[0], val) + + def CheckUnsupportedSeq(self): + class Bar: pass + val = Bar() + try: + self.cur.execute("insert into test(f) values (?)", (val,)) + self.fail("should have raised an InterfaceError") + except sqlite.InterfaceError: + pass + except: + self.fail("should have raised an InterfaceError") + + def CheckUnsupportedDict(self): + class Bar: pass + val = Bar() + try: + self.cur.execute("insert into test(f) values (:val)", {"val": val}) + self.fail("should have raised an InterfaceError") + except sqlite.InterfaceError: + pass + except: + self.fail("should have raised an InterfaceError") + + def CheckBlob(self): + # default + val = buffer("Guglhupf") + self.cur.execute("insert into test(bin) values (?)", (val,)) + self.cur.execute("select bin from test") + row = self.cur.fetchone() + self.assertEqual(row[0], val) + + def CheckNumber1(self): + self.cur.execute("insert into test(n1) values (5)") + value = self.cur.execute("select n1 from test").fetchone()[0] + # if the converter is not used, it's an int instead of a float + self.assertEqual(type(value), float) + + def CheckNumber2(self): + """Checks wether converter names are cut off at '(' characters""" + self.cur.execute("insert into test(n2) values (5)") + value = self.cur.execute("select n2 from test").fetchone()[0] + # if the converter is not used, it's an int instead of a float + self.assertEqual(type(value), float) + +class ColNamesTests(unittest.TestCase): + def setUp(self): + self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES) + self.cur = self.con.cursor() + self.cur.execute("create table test(x foo)") + + sqlite.converters["FOO"] = lambda x: "[%s]" % x + sqlite.converters["BAR"] = lambda x: "<%s>" % x + sqlite.converters["EXC"] = lambda x: 5 // 0 + sqlite.converters["B1B1"] = lambda x: "MARKER" + + def tearDown(self): + del sqlite.converters["FOO"] + del sqlite.converters["BAR"] + del sqlite.converters["EXC"] + del sqlite.converters["B1B1"] + self.cur.close() + self.con.close() + + def CheckDeclTypeNotUsed(self): + """ + Assures that the declared type is not used when PARSE_DECLTYPES + is not set. + """ + self.cur.execute("insert into test(x) values (?)", ("xxx",)) + self.cur.execute("select x from test") + val = self.cur.fetchone()[0] + self.assertEqual(val, "xxx") + + def CheckNone(self): + self.cur.execute("insert into test(x) values (?)", (None,)) + self.cur.execute("select x from test") + val = self.cur.fetchone()[0] + self.assertEqual(val, None) + + def CheckColName(self): + self.cur.execute("insert into test(x) values (?)", ("xxx",)) + self.cur.execute('select x as "x [bar]" from test') + val = self.cur.fetchone()[0] + self.assertEqual(val, "<xxx>") + + # Check if the stripping of colnames works. Everything after the first + # whitespace should be stripped. + self.assertEqual(self.cur.description[0][0], "x") + + def CheckCaseInConverterName(self): + self.cur.execute("""select 'other' as "x [b1b1]\"""") + val = self.cur.fetchone()[0] + self.assertEqual(val, "MARKER") + + def CheckCursorDescriptionNoRow(self): + """ + cursor.description should at least provide the column name(s), even if + no row returned. + """ + self.cur.execute("select * from test where 0 = 1") + self.assert_(self.cur.description[0][0] == "x") + +class ObjectAdaptationTests(unittest.TestCase): + def cast(obj): + return float(obj) + cast = staticmethod(cast) + + def setUp(self): + self.con = sqlite.connect(":memory:") + try: + del sqlite.adapters[int] + except: + pass + sqlite.register_adapter(int, ObjectAdaptationTests.cast) + self.cur = self.con.cursor() + + def tearDown(self): + del sqlite.adapters[(int, sqlite.PrepareProtocol)] + self.cur.close() + self.con.close() + + def CheckCasterIsUsed(self): + self.cur.execute("select ?", (4,)) + val = self.cur.fetchone()[0] + self.assertEqual(type(val), float) + +class BinaryConverterTests(unittest.TestCase): + def convert(s): + return zlib.decompress(s) + convert = staticmethod(convert) + + def setUp(self): + self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES) + sqlite.register_converter("bin", BinaryConverterTests.convert) + + def tearDown(self): + self.con.close() + + def CheckBinaryInputForConverter(self): + testdata = "abcdefg" * 10 + result = self.con.execute('select ? as "x [bin]"', (buffer(zlib.compress(testdata)),)).fetchone()[0] + self.assertEqual(testdata, result) + +class DateTimeTests(unittest.TestCase): + def setUp(self): + self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES) + self.cur = self.con.cursor() + self.cur.execute("create table test(d date, ts timestamp)") + + def tearDown(self): + self.cur.close() + self.con.close() + + def CheckSqliteDate(self): + d = sqlite.Date(2004, 2, 14) + self.cur.execute("insert into test(d) values (?)", (d,)) + self.cur.execute("select d from test") + d2 = self.cur.fetchone()[0] + self.assertEqual(d, d2) + + def CheckSqliteTimestamp(self): + ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0) + self.cur.execute("insert into test(ts) values (?)", (ts,)) + self.cur.execute("select ts from test") + ts2 = self.cur.fetchone()[0] + self.assertEqual(ts, ts2) + + def CheckSqlTimestamp(self): + # The date functions are only available in SQLite version 3.1 or later + if sqlite.sqlite_version_info < (3, 1): + return + + # SQLite's current_timestamp uses UTC time, while datetime.datetime.now() uses local time. + now = datetime.datetime.now() + self.cur.execute("insert into test(ts) values (current_timestamp)") + self.cur.execute("select ts from test") + ts = self.cur.fetchone()[0] + self.assertEqual(type(ts), datetime.datetime) + self.assertEqual(ts.year, now.year) + + def CheckDateTimeSubSeconds(self): + ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 500000) + self.cur.execute("insert into test(ts) values (?)", (ts,)) + self.cur.execute("select ts from test") + ts2 = self.cur.fetchone()[0] + self.assertEqual(ts, ts2) + + def CheckDateTimeSubSecondsFloatingPoint(self): + ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 510241) + self.cur.execute("insert into test(ts) values (?)", (ts,)) + self.cur.execute("select ts from test") + ts2 = self.cur.fetchone()[0] + self.assertEqual(ts, ts2) + +def suite(): + sqlite_type_suite = unittest.makeSuite(SqliteTypeTests, "Check") + decltypes_type_suite = unittest.makeSuite(DeclTypesTests, "Check") + colnames_type_suite = unittest.makeSuite(ColNamesTests, "Check") + adaptation_suite = unittest.makeSuite(ObjectAdaptationTests, "Check") + bin_suite = unittest.makeSuite(BinaryConverterTests, "Check") + date_suite = unittest.makeSuite(DateTimeTests, "Check") + return unittest.TestSuite((sqlite_type_suite, decltypes_type_suite, colnames_type_suite, adaptation_suite, bin_suite, date_suite)) + +def test(): + runner = unittest.TextTestRunner() + runner.run(suite()) + +if __name__ == "__main__": + test() diff --git a/lib/test/userfunctions.py b/lib/test/userfunctions.py new file mode 100644 index 0000000..6b60f60 --- /dev/null +++ b/lib/test/userfunctions.py @@ -0,0 +1,413 @@ +#-*- coding: ISO-8859-1 -*- +# pysqlite2/test/userfunctions.py: tests for user-defined functions and +# aggregates. +# +# Copyright (C) 2005-2007 Gerhard Häring <gh@ghaering.de> +# +# This file is part of pysqlite. +# +# This software is provided 'as-is', without any express or implied +# warranty. In no event will the authors be held liable for any damages +# arising from the use of this software. +# +# Permission is granted to anyone to use this software for any purpose, +# including commercial applications, and to alter it and redistribute it +# freely, subject to the following restrictions: +# +# 1. The origin of this software must not be misrepresented; you must not +# claim that you wrote the original software. If you use this software +# in a product, an acknowledgment in the product documentation would be +# appreciated but is not required. +# 2. Altered source versions must be plainly marked as such, and must not be +# misrepresented as being the original software. +# 3. This notice may not be removed or altered from any source distribution. + +import unittest +import pysqlcipher.dbapi2 as sqlite + +def func_returntext(): + return "foo" +def func_returnunicode(): + return u"bar" +def func_returnint(): + return 42 +def func_returnfloat(): + return 3.14 +def func_returnnull(): + return None +def func_returnblob(): + return buffer("blob") +def func_raiseexception(): + 5 // 0 + +def func_isstring(v): + return type(v) is unicode +def func_isint(v): + return type(v) is int +def func_isfloat(v): + return type(v) is float +def func_isnone(v): + return type(v) is type(None) +def func_isblob(v): + return type(v) is buffer + +class AggrNoStep: + def __init__(self): + pass + + def finalize(self): + return 1 + +class AggrNoFinalize: + def __init__(self): + pass + + def step(self, x): + pass + +class AggrExceptionInInit: + def __init__(self): + 5 // 0 + + def step(self, x): + pass + + def finalize(self): + pass + +class AggrExceptionInStep: + def __init__(self): + pass + + def step(self, x): + 5 // 0 + + def finalize(self): + return 42 + +class AggrExceptionInFinalize: + def __init__(self): + pass + + def step(self, x): + pass + + def finalize(self): + 5 // 0 + +class AggrCheckType: + def __init__(self): + self.val = None + + def step(self, whichType, val): + theType = {"str": unicode, "int": int, "float": float, "None": type(None), "blob": buffer} + self.val = int(theType[whichType] is type(val)) + + def finalize(self): + return self.val + +class AggrSum: + def __init__(self): + self.val = 0.0 + + def step(self, val): + self.val += val + + def finalize(self): + return self.val + +class FunctionTests(unittest.TestCase): + def setUp(self): + self.con = sqlite.connect(":memory:") + + self.con.create_function("returntext", 0, func_returntext) + self.con.create_function("returnunicode", 0, func_returnunicode) + self.con.create_function("returnint", 0, func_returnint) + self.con.create_function("returnfloat", 0, func_returnfloat) + self.con.create_function("returnnull", 0, func_returnnull) + self.con.create_function("returnblob", 0, func_returnblob) + self.con.create_function("raiseexception", 0, func_raiseexception) + + self.con.create_function("isstring", 1, func_isstring) + self.con.create_function("isint", 1, func_isint) + self.con.create_function("isfloat", 1, func_isfloat) + self.con.create_function("isnone", 1, func_isnone) + self.con.create_function("isblob", 1, func_isblob) + + def tearDown(self): + self.con.close() + + def CheckFuncErrorOnCreate(self): + try: + self.con.create_function("bla", -100, lambda x: 2*x) + self.fail("should have raised an OperationalError") + except sqlite.OperationalError: + pass + + def CheckFuncRefCount(self): + def getfunc(): + def f(): + return 1 + return f + f = getfunc() + globals()["foo"] = f + # self.con.create_function("reftest", 0, getfunc()) + self.con.create_function("reftest", 0, f) + cur = self.con.cursor() + cur.execute("select reftest()") + + def CheckFuncReturnText(self): + cur = self.con.cursor() + cur.execute("select returntext()") + val = cur.fetchone()[0] + self.assertEqual(type(val), unicode) + self.assertEqual(val, "foo") + + def CheckFuncReturnUnicode(self): + cur = self.con.cursor() + cur.execute("select returnunicode()") + val = cur.fetchone()[0] + self.assertEqual(type(val), unicode) + self.assertEqual(val, u"bar") + + def CheckFuncReturnInt(self): + cur = self.con.cursor() + cur.execute("select returnint()") + val = cur.fetchone()[0] + self.assertEqual(type(val), int) + self.assertEqual(val, 42) + + def CheckFuncReturnFloat(self): + cur = self.con.cursor() + cur.execute("select returnfloat()") + val = cur.fetchone()[0] + self.assertEqual(type(val), float) + if val < 3.139 or val > 3.141: + self.fail("wrong value") + + def CheckFuncReturnNull(self): + cur = self.con.cursor() + cur.execute("select returnnull()") + val = cur.fetchone()[0] + self.assertEqual(type(val), type(None)) + self.assertEqual(val, None) + + def CheckFuncReturnBlob(self): + cur = self.con.cursor() + cur.execute("select returnblob()") + val = cur.fetchone()[0] + self.assertEqual(type(val), buffer) + self.assertEqual(val, buffer("blob")) + + def CheckFuncException(self): + cur = self.con.cursor() + try: + cur.execute("select raiseexception()") + cur.fetchone() + self.fail("should have raised OperationalError") + except sqlite.OperationalError, e: + self.assertEqual(e.args[0], 'user-defined function raised exception') + + def CheckParamString(self): + cur = self.con.cursor() + cur.execute("select isstring(?)", ("foo",)) + val = cur.fetchone()[0] + self.assertEqual(val, 1) + + def CheckParamInt(self): + cur = self.con.cursor() + cur.execute("select isint(?)", (42,)) + val = cur.fetchone()[0] + self.assertEqual(val, 1) + + def CheckParamFloat(self): + cur = self.con.cursor() + cur.execute("select isfloat(?)", (3.14,)) + val = cur.fetchone()[0] + self.assertEqual(val, 1) + + def CheckParamNone(self): + cur = self.con.cursor() + cur.execute("select isnone(?)", (None,)) + val = cur.fetchone()[0] + self.assertEqual(val, 1) + + def CheckParamBlob(self): + cur = self.con.cursor() + cur.execute("select isblob(?)", (buffer("blob"),)) + val = cur.fetchone()[0] + self.assertEqual(val, 1) + +class AggregateTests(unittest.TestCase): + def setUp(self): + self.con = sqlite.connect(":memory:") + cur = self.con.cursor() + cur.execute(""" + create table test( + t text, + i integer, + f float, + n, + b blob + ) + """) + cur.execute("insert into test(t, i, f, n, b) values (?, ?, ?, ?, ?)", + ("foo", 5, 3.14, None, buffer("blob"),)) + + self.con.create_aggregate("nostep", 1, AggrNoStep) + self.con.create_aggregate("nofinalize", 1, AggrNoFinalize) + self.con.create_aggregate("excInit", 1, AggrExceptionInInit) + self.con.create_aggregate("excStep", 1, AggrExceptionInStep) + self.con.create_aggregate("excFinalize", 1, AggrExceptionInFinalize) + self.con.create_aggregate("checkType", 2, AggrCheckType) + self.con.create_aggregate("mysum", 1, AggrSum) + + def tearDown(self): + #self.cur.close() + #self.con.close() + pass + + def CheckAggrErrorOnCreate(self): + try: + self.con.create_function("bla", -100, AggrSum) + self.fail("should have raised an OperationalError") + except sqlite.OperationalError: + pass + + def CheckAggrNoStep(self): + cur = self.con.cursor() + try: + cur.execute("select nostep(t) from test") + self.fail("should have raised an AttributeError") + except AttributeError, e: + self.assertEqual(e.args[0], "AggrNoStep instance has no attribute 'step'") + + def CheckAggrNoFinalize(self): + cur = self.con.cursor() + try: + cur.execute("select nofinalize(t) from test") + val = cur.fetchone()[0] + self.fail("should have raised an OperationalError") + except sqlite.OperationalError, e: + self.assertEqual(e.args[0], "user-defined aggregate's 'finalize' method raised error") + + def CheckAggrExceptionInInit(self): + cur = self.con.cursor() + try: + cur.execute("select excInit(t) from test") + val = cur.fetchone()[0] + self.fail("should have raised an OperationalError") + except sqlite.OperationalError, e: + self.assertEqual(e.args[0], "user-defined aggregate's '__init__' method raised error") + + def CheckAggrExceptionInStep(self): + cur = self.con.cursor() + try: + cur.execute("select excStep(t) from test") + val = cur.fetchone()[0] + self.fail("should have raised an OperationalError") + except sqlite.OperationalError, e: + self.assertEqual(e.args[0], "user-defined aggregate's 'step' method raised error") + + def CheckAggrExceptionInFinalize(self): + cur = self.con.cursor() + try: + cur.execute("select excFinalize(t) from test") + val = cur.fetchone()[0] + self.fail("should have raised an OperationalError") + except sqlite.OperationalError, e: + self.assertEqual(e.args[0], "user-defined aggregate's 'finalize' method raised error") + + def CheckAggrCheckParamStr(self): + cur = self.con.cursor() + cur.execute("select checkType('str', ?)", ("foo",)) + val = cur.fetchone()[0] + self.assertEqual(val, 1) + + def CheckAggrCheckParamInt(self): + cur = self.con.cursor() + cur.execute("select checkType('int', ?)", (42,)) + val = cur.fetchone()[0] + self.assertEqual(val, 1) + + def CheckAggrCheckParamFloat(self): + cur = self.con.cursor() + cur.execute("select checkType('float', ?)", (3.14,)) + val = cur.fetchone()[0] + self.assertEqual(val, 1) + + def CheckAggrCheckParamNone(self): + cur = self.con.cursor() + cur.execute("select checkType('None', ?)", (None,)) + val = cur.fetchone()[0] + self.assertEqual(val, 1) + + def CheckAggrCheckParamBlob(self): + cur = self.con.cursor() + cur.execute("select checkType('blob', ?)", (buffer("blob"),)) + val = cur.fetchone()[0] + self.assertEqual(val, 1) + + def CheckAggrCheckAggrSum(self): + cur = self.con.cursor() + cur.execute("delete from test") + cur.executemany("insert into test(i) values (?)", [(10,), (20,), (30,)]) + cur.execute("select mysum(i) from test") + val = cur.fetchone()[0] + self.assertEqual(val, 60) + +def authorizer_cb(action, arg1, arg2, dbname, source): + if action != sqlite.SQLITE_SELECT: + return sqlite.SQLITE_DENY + if arg2 == 'c2' or arg1 == 't2': + return sqlite.SQLITE_DENY + return sqlite.SQLITE_OK + +class AuthorizerTests(unittest.TestCase): + def setUp(self): + self.con = sqlite.connect(":memory:") + self.con.executescript(""" + create table t1 (c1, c2); + create table t2 (c1, c2); + insert into t1 (c1, c2) values (1, 2); + insert into t2 (c1, c2) values (4, 5); + """) + + # For our security test: + self.con.execute("select c2 from t2") + + self.con.set_authorizer(authorizer_cb) + + def tearDown(self): + pass + + def CheckTableAccess(self): + try: + self.con.execute("select * from t2") + except sqlite.DatabaseError, e: + if not e.args[0].endswith("prohibited"): + self.fail("wrong exception text: %s" % e.args[0]) + return + self.fail("should have raised an exception due to missing privileges") + + def CheckColumnAccess(self): + try: + self.con.execute("select c2 from t1") + except sqlite.DatabaseError, e: + if not e.args[0].endswith("prohibited"): + self.fail("wrong exception text: %s" % e.args[0]) + return + self.fail("should have raised an exception due to missing privileges") + +def suite(): + function_suite = unittest.makeSuite(FunctionTests, "Check") + aggregate_suite = unittest.makeSuite(AggregateTests, "Check") + authorizer_suite = unittest.makeSuite(AuthorizerTests, "Check") + return unittest.TestSuite((function_suite, aggregate_suite, authorizer_suite)) + +def test(): + runner = unittest.TextTestRunner() + runner.run(suite()) + +if __name__ == "__main__": + test() |