diff options
| author | Kali Kaneko <kali@futeisha.org> | 2013-02-04 19:20:12 +0900 | 
|---|---|---|
| committer | Kali Kaneko <kali@futeisha.org> | 2013-02-04 19:20:12 +0900 | 
| commit | 4189e53a881e52de0945375a72b8748143c5bd13 (patch) | |
| tree | 23ed8e0600dc3c62da7a95e50f778c79a9be1e75 /lib/test | |
initial commit
Diffstat (limited to 'lib/test')
| -rw-r--r-- | lib/test/__init__.py | 50 | ||||
| -rw-r--r-- | lib/test/dbapi.py | 878 | ||||
| -rw-r--r-- | lib/test/dump.py | 52 | ||||
| -rw-r--r-- | lib/test/factory.py | 205 | ||||
| -rw-r--r-- | lib/test/hooks.py | 188 | ||||
| -rw-r--r-- | lib/test/py25/__init__.py | 1 | ||||
| -rw-r--r-- | lib/test/py25/py25tests.py | 80 | ||||
| -rw-r--r-- | lib/test/regression.py | 271 | ||||
| -rw-r--r-- | lib/test/transactions.py | 205 | ||||
| -rw-r--r-- | lib/test/types.py | 414 | ||||
| -rw-r--r-- | lib/test/userfunctions.py | 413 | 
11 files changed, 2757 insertions, 0 deletions
| diff --git a/lib/test/__init__.py b/lib/test/__init__.py new file mode 100644 index 0000000..1c760bb --- /dev/null +++ b/lib/test/__init__.py @@ -0,0 +1,50 @@ +#-*- coding: ISO-8859-1 -*- +# pysqlite2/test/__init__.py: the package containing the test suite +# +# Copyright (C) 2004-2007 Gerhard Häring <gh@ghaering.de> +# +# This file is part of pysqlite. +# +# This software is provided 'as-is', without any express or implied +# warranty.  In no event will the authors be held liable for any damages +# arising from the use of this software. +# +# Permission is granted to anyone to use this software for any purpose, +# including commercial applications, and to alter it and redistribute it +# freely, subject to the following restrictions: +# +# 1. The origin of this software must not be misrepresented; you must not +#    claim that you wrote the original software. If you use this software +#    in a product, an acknowledgment in the product documentation would be +#    appreciated but is not required. +# 2. Altered source versions must be plainly marked as such, and must not be +#    misrepresented as being the original software. +# 3. This notice may not be removed or altered from any source distribution. + +import os, sys +import unittest + +if os.path.exists("extended_setup.py"): +    print "-" * 75 +    print "You should not run the test suite from the pysqlite build directory." +    print "This does not work well because the extension module cannot be found." +    print "Just run the test suite from somewhere else, please!" +    print "-" * 75 +    sys.exit(1) + +from pysqlite2.test import dbapi, types, userfunctions, factory, transactions,\ +    hooks, regression, dump +from pysqlcipher import dbapi2 as sqlite + +def suite(): +    tests = [dbapi.suite(), types.suite(), userfunctions.suite(), +      factory.suite(), transactions.suite(), hooks.suite(), regression.suite(), dump.suite()] +    if sys.version_info >= (2, 5, 0): +        from pysqlite2.test.py25 import py25tests +        tests.append(py25tests.suite()) + +    return unittest.TestSuite(tuple(tests)) + +def test(): +    runner = unittest.TextTestRunner() +    runner.run(suite()) diff --git a/lib/test/dbapi.py b/lib/test/dbapi.py new file mode 100644 index 0000000..05dfd4b --- /dev/null +++ b/lib/test/dbapi.py @@ -0,0 +1,878 @@ +#-*- coding: ISO-8859-1 -*- +# pysqlite2/test/dbapi.py: tests for DB-API compliance +# +# Copyright (C) 2004-2009 Gerhard Häring <gh@ghaering.de> +# +# This file is part of pysqlite. +# +# This software is provided 'as-is', without any express or implied +# warranty.  In no event will the authors be held liable for any damages +# arising from the use of this software. +# +# Permission is granted to anyone to use this software for any purpose, +# including commercial applications, and to alter it and redistribute it +# freely, subject to the following restrictions: +# +# 1. The origin of this software must not be misrepresented; you must not +#    claim that you wrote the original software. If you use this software +#    in a product, an acknowledgment in the product documentation would be +#    appreciated but is not required. +# 2. Altered source versions must be plainly marked as such, and must not be +#    misrepresented as being the original software. +# 3. This notice may not be removed or altered from any source distribution. + +import unittest +import sys +import threading +import pysqlcipher.dbapi2 as sqlite + +class ModuleTests(unittest.TestCase): +    def CheckAPILevel(self): +        self.assertEqual(sqlite.apilevel, "2.0", +                         "apilevel is %s, should be 2.0" % sqlite.apilevel) + +    def CheckThreadSafety(self): +        self.assertEqual(sqlite.threadsafety, 1, +                         "threadsafety is %d, should be 1" % sqlite.threadsafety) + +    def CheckParamStyle(self): +        self.assertEqual(sqlite.paramstyle, "qmark", +                         "paramstyle is '%s', should be 'qmark'" % +                         sqlite.paramstyle) + +    def CheckWarning(self): +        self.assert_(issubclass(sqlite.Warning, StandardError), +                     "Warning is not a subclass of StandardError") + +    def CheckError(self): +        self.assertTrue(issubclass(sqlite.Error, StandardError), +                        "Error is not a subclass of StandardError") + +    def CheckInterfaceError(self): +        self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error), +                        "InterfaceError is not a subclass of Error") + +    def CheckDatabaseError(self): +        self.assertTrue(issubclass(sqlite.DatabaseError, sqlite.Error), +                        "DatabaseError is not a subclass of Error") + +    def CheckDataError(self): +        self.assertTrue(issubclass(sqlite.DataError, sqlite.DatabaseError), +                        "DataError is not a subclass of DatabaseError") + +    def CheckOperationalError(self): +        self.assertTrue(issubclass(sqlite.OperationalError, sqlite.DatabaseError), +                        "OperationalError is not a subclass of DatabaseError") + +    def CheckIntegrityError(self): +        self.assertTrue(issubclass(sqlite.IntegrityError, sqlite.DatabaseError), +                        "IntegrityError is not a subclass of DatabaseError") + +    def CheckInternalError(self): +        self.assertTrue(issubclass(sqlite.InternalError, sqlite.DatabaseError), +                        "InternalError is not a subclass of DatabaseError") + +    def CheckProgrammingError(self): +        self.assertTrue(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError), +                        "ProgrammingError is not a subclass of DatabaseError") + +    def CheckNotSupportedError(self): +        self.assertTrue(issubclass(sqlite.NotSupportedError, +                                   sqlite.DatabaseError), +                        "NotSupportedError is not a subclass of DatabaseError") + +class ConnectionTests(unittest.TestCase): +    def setUp(self): +        self.cx = sqlite.connect(":memory:") +        cu = self.cx.cursor() +        cu.execute("create table test(id integer primary key, name text)") +        cu.execute("insert into test(name) values (?)", ("foo",)) + +    def tearDown(self): +        self.cx.close() + +    def CheckCommit(self): +        self.cx.commit() + +    def CheckCommitAfterNoChanges(self): +        """ +        A commit should also work when no changes were made to the database. +        """ +        self.cx.commit() +        self.cx.commit() + +    def CheckRollback(self): +        self.cx.rollback() + +    def CheckRollbackAfterNoChanges(self): +        """ +        A rollback should also work when no changes were made to the database. +        """ +        self.cx.rollback() +        self.cx.rollback() + +    def CheckCursor(self): +        cu = self.cx.cursor() + +    def CheckFailedOpen(self): +        YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db" +        try: +            con = sqlite.connect(YOU_CANNOT_OPEN_THIS) +        except sqlite.OperationalError: +            return +        self.fail("should have raised an OperationalError") + +    def CheckClose(self): +        self.cx.close() + +    def CheckExceptions(self): +        # Optional DB-API extension. +        self.assertEqual(self.cx.Warning, sqlite.Warning) +        self.assertEqual(self.cx.Error, sqlite.Error) +        self.assertEqual(self.cx.InterfaceError, sqlite.InterfaceError) +        self.assertEqual(self.cx.DatabaseError, sqlite.DatabaseError) +        self.assertEqual(self.cx.DataError, sqlite.DataError) +        self.assertEqual(self.cx.OperationalError, sqlite.OperationalError) +        self.assertEqual(self.cx.IntegrityError, sqlite.IntegrityError) +        self.assertEqual(self.cx.InternalError, sqlite.InternalError) +        self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError) +        self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError) + +class CursorTests(unittest.TestCase): +    def setUp(self): +        self.cx = sqlite.connect(":memory:") +        self.cu = self.cx.cursor() +        self.cu.execute("create table test(id integer primary key, name text, income number)") +        self.cu.execute("insert into test(name) values (?)", ("foo",)) + +    def tearDown(self): +        self.cu.close() +        self.cx.close() + +    def CheckExecuteNoArgs(self): +        self.cu.execute("delete from test") + +    def CheckExecuteIllegalSql(self): +        try: +            self.cu.execute("select asdf") +            self.fail("should have raised an OperationalError") +        except sqlite.OperationalError: +            return +        except: +            self.fail("raised wrong exception") + +    def CheckExecuteTooMuchSql(self): +        try: +            self.cu.execute("select 5+4; select 4+5") +            self.fail("should have raised a ProgrammingError") +        except sqlite.ProgrammingError: +            return +        except: +            self.fail("raised wrong exception") + +    def CheckExecuteTooMuchSql2(self): +        self.cu.execute("select 5+4; -- foo bar") + +    def CheckExecuteTooMuchSql3(self): +        self.cu.execute(""" +            select 5+4; + +            /* +            foo +            */ +            """) + +    def CheckExecuteWrongSqlArg(self): +        try: +            self.cu.execute(42) +            self.fail("should have raised a ValueError") +        except ValueError: +            return +        except: +            self.fail("raised wrong exception.") + +    def CheckExecuteArgInt(self): +        self.cu.execute("insert into test(id) values (?)", (42,)) + +    def CheckExecuteArgFloat(self): +        self.cu.execute("insert into test(income) values (?)", (2500.32,)) + +    def CheckExecuteArgString(self): +        self.cu.execute("insert into test(name) values (?)", ("Hugo",)) + +    def CheckExecuteWrongNoOfArgs1(self): +        # too many parameters +        try: +            self.cu.execute("insert into test(id) values (?)", (17, "Egon")) +            self.fail("should have raised ProgrammingError") +        except sqlite.ProgrammingError: +            pass + +    def CheckExecuteWrongNoOfArgs2(self): +        # too little parameters +        try: +            self.cu.execute("insert into test(id) values (?)") +            self.fail("should have raised ProgrammingError") +        except sqlite.ProgrammingError: +            pass + +    def CheckExecuteWrongNoOfArgs3(self): +        # no parameters, parameters are needed +        try: +            self.cu.execute("insert into test(id) values (?)") +            self.fail("should have raised ProgrammingError") +        except sqlite.ProgrammingError: +            pass + +    def CheckExecuteParamList(self): +        self.cu.execute("insert into test(name) values ('foo')") +        self.cu.execute("select name from test where name=?", ["foo"]) +        row = self.cu.fetchone() +        self.assertEqual(row[0], "foo") + +    def CheckExecuteParamSequence(self): +        class L(object): +            def __len__(self): +                return 1 +            def __getitem__(self, x): +                assert x == 0 +                return "foo" + +        self.cu.execute("insert into test(name) values ('foo')") +        self.cu.execute("select name from test where name=?", L()) +        row = self.cu.fetchone() +        self.assertEqual(row[0], "foo") + +    def CheckExecuteDictMapping(self): +        self.cu.execute("insert into test(name) values ('foo')") +        self.cu.execute("select name from test where name=:name", {"name": "foo"}) +        row = self.cu.fetchone() +        self.assertEqual(row[0], "foo") + +    def CheckExecuteDictMapping_Mapping(self): +        # Test only works with Python 2.5 or later +        if sys.version_info < (2, 5, 0): +            return + +        class D(dict): +            def __missing__(self, key): +                return "foo" + +        self.cu.execute("insert into test(name) values ('foo')") +        self.cu.execute("select name from test where name=:name", D()) +        row = self.cu.fetchone() +        self.assertEqual(row[0], "foo") + +    def CheckExecuteDictMappingTooLittleArgs(self): +        self.cu.execute("insert into test(name) values ('foo')") +        try: +            self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"}) +            self.fail("should have raised ProgrammingError") +        except sqlite.ProgrammingError: +            pass + +    def CheckExecuteDictMappingNoArgs(self): +        self.cu.execute("insert into test(name) values ('foo')") +        try: +            self.cu.execute("select name from test where name=:name") +            self.fail("should have raised ProgrammingError") +        except sqlite.ProgrammingError: +            pass + +    def CheckExecuteDictMappingUnnamed(self): +        self.cu.execute("insert into test(name) values ('foo')") +        try: +            self.cu.execute("select name from test where name=?", {"name": "foo"}) +            self.fail("should have raised ProgrammingError") +        except sqlite.ProgrammingError: +            pass + +    def CheckClose(self): +        self.cu.close() + +    def CheckRowcountExecute(self): +        self.cu.execute("delete from test") +        self.cu.execute("insert into test(name) values ('foo')") +        self.cu.execute("insert into test(name) values ('foo')") +        self.cu.execute("update test set name='bar'") +        self.assertEqual(self.cu.rowcount, 2) + +    def CheckRowcountSelect(self): +        """ +        pysqlite does not know the rowcount of SELECT statements, because we +        don't fetch all rows after executing the select statement. The rowcount +        has thus to be -1. +        """ +        self.cu.execute("select 5 union select 6") +        self.assertEqual(self.cu.rowcount, -1) + +    def CheckRowcountExecutemany(self): +        self.cu.execute("delete from test") +        self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)]) +        self.assertEqual(self.cu.rowcount, 3) + +    def CheckTotalChanges(self): +        self.cu.execute("insert into test(name) values ('foo')") +        self.cu.execute("insert into test(name) values ('foo')") +        if self.cx.total_changes < 2: +            self.fail("total changes reported wrong value") + +    # Checks for executemany: +    # Sequences are required by the DB-API, iterators +    # enhancements in pysqlite. + +    def CheckExecuteManySequence(self): +        self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)]) + +    def CheckExecuteManyIterator(self): +        class MyIter: +            def __init__(self): +                self.value = 5 + +            def next(self): +                if self.value == 10: +                    raise StopIteration +                else: +                    self.value += 1 +                    return (self.value,) + +        self.cu.executemany("insert into test(income) values (?)", MyIter()) + +    def CheckExecuteManyGenerator(self): +        def mygen(): +            for i in range(5): +                yield (i,) + +        self.cu.executemany("insert into test(income) values (?)", mygen()) + +    def CheckExecuteManyWrongSqlArg(self): +        try: +            self.cu.executemany(42, [(3,)]) +            self.fail("should have raised a ValueError") +        except ValueError: +            return +        except: +            self.fail("raised wrong exception.") + +    def CheckExecuteManySelect(self): +        try: +            self.cu.executemany("select ?", [(3,)]) +            self.fail("should have raised a ProgrammingError") +        except sqlite.ProgrammingError: +            return +        except: +            self.fail("raised wrong exception.") + +    def CheckExecuteManyNotIterable(self): +        try: +            self.cu.executemany("insert into test(income) values (?)", 42) +            self.fail("should have raised a TypeError") +        except TypeError: +            return +        except Exception, e: +            print "raised", e.__class__ +            self.fail("raised wrong exception.") + +    def CheckFetchIter(self): +        # Optional DB-API extension. +        self.cu.execute("delete from test") +        self.cu.execute("insert into test(id) values (?)", (5,)) +        self.cu.execute("insert into test(id) values (?)", (6,)) +        self.cu.execute("select id from test order by id") +        lst = [] +        for row in self.cu: +            lst.append(row[0]) +        self.assertEqual(lst[0], 5) +        self.assertEqual(lst[1], 6) + +    def CheckFetchone(self): +        self.cu.execute("select name from test") +        row = self.cu.fetchone() +        self.assertEqual(row[0], "foo") +        row = self.cu.fetchone() +        self.assertEqual(row, None) + +    def CheckFetchoneNoStatement(self): +        cur = self.cx.cursor() +        row = cur.fetchone() +        self.assertEqual(row, None) + +    def CheckArraySize(self): +        # must default ot 1 +        self.assertEqual(self.cu.arraysize, 1) + +        # now set to 2 +        self.cu.arraysize = 2 + +        # now make the query return 3 rows +        self.cu.execute("delete from test") +        self.cu.execute("insert into test(name) values ('A')") +        self.cu.execute("insert into test(name) values ('B')") +        self.cu.execute("insert into test(name) values ('C')") +        self.cu.execute("select name from test") +        res = self.cu.fetchmany() + +        self.assertEqual(len(res), 2) + +    def CheckFetchmany(self): +        self.cu.execute("select name from test") +        res = self.cu.fetchmany(100) +        self.assertEqual(len(res), 1) +        res = self.cu.fetchmany(100) +        self.assertEqual(res, []) + +    def CheckFetchmanyKwArg(self): +        """Checks if fetchmany works with keyword arguments""" +        self.cu.execute("select name from test") +        res = self.cu.fetchmany(size=100) +        self.assertEqual(len(res), 1) + +    def CheckFetchall(self): +        self.cu.execute("select name from test") +        res = self.cu.fetchall() +        self.assertEqual(len(res), 1) +        res = self.cu.fetchall() +        self.assertEqual(res, []) + +    def CheckSetinputsizes(self): +        self.cu.setinputsizes([3, 4, 5]) + +    def CheckSetoutputsize(self): +        self.cu.setoutputsize(5, 0) + +    def CheckSetoutputsizeNoColumn(self): +        self.cu.setoutputsize(42) + +    def CheckCursorConnection(self): +        # Optional DB-API extension. +        self.assertEqual(self.cu.connection, self.cx) + +    def CheckWrongCursorCallable(self): +        try: +            def f(): pass +            cur = self.cx.cursor(f) +            self.fail("should have raised a TypeError") +        except TypeError: +            return +        self.fail("should have raised a ValueError") + +    def CheckCursorWrongClass(self): +        class Foo: pass +        foo = Foo() +        try: +            cur = sqlite.Cursor(foo) +            self.fail("should have raised a ValueError") +        except TypeError: +            pass + +class ThreadTests(unittest.TestCase): +    def setUp(self): +        self.con = sqlite.connect(":memory:") +        self.cur = self.con.cursor() +        self.cur.execute("create table test(id integer primary key, name text, bin binary, ratio number, ts timestamp)") + +    def tearDown(self): +        self.cur.close() +        self.con.close() + +    def CheckConCursor(self): +        def run(con, errors): +            try: +                cur = con.cursor() +                errors.append("did not raise ProgrammingError") +                return +            except sqlite.ProgrammingError: +                return +            except: +                errors.append("raised wrong exception") + +        errors = [] +        t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) +        t.start() +        t.join() +        if len(errors) > 0: +            self.fail("\n".join(errors)) + +    def CheckConCommit(self): +        def run(con, errors): +            try: +                con.commit() +                errors.append("did not raise ProgrammingError") +                return +            except sqlite.ProgrammingError: +                return +            except: +                errors.append("raised wrong exception") + +        errors = [] +        t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) +        t.start() +        t.join() +        if len(errors) > 0: +            self.fail("\n".join(errors)) + +    def CheckConRollback(self): +        def run(con, errors): +            try: +                con.rollback() +                errors.append("did not raise ProgrammingError") +                return +            except sqlite.ProgrammingError: +                return +            except: +                errors.append("raised wrong exception") + +        errors = [] +        t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) +        t.start() +        t.join() +        if len(errors) > 0: +            self.fail("\n".join(errors)) + +    def CheckConClose(self): +        def run(con, errors): +            try: +                con.close() +                errors.append("did not raise ProgrammingError") +                return +            except sqlite.ProgrammingError: +                return +            except: +                errors.append("raised wrong exception") + +        errors = [] +        t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) +        t.start() +        t.join() +        if len(errors) > 0: +            self.fail("\n".join(errors)) + +    def CheckCurImplicitBegin(self): +        def run(cur, errors): +            try: +                cur.execute("insert into test(name) values ('a')") +                errors.append("did not raise ProgrammingError") +                return +            except sqlite.ProgrammingError: +                return +            except: +                errors.append("raised wrong exception") + +        errors = [] +        t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) +        t.start() +        t.join() +        if len(errors) > 0: +            self.fail("\n".join(errors)) + +    def CheckCurClose(self): +        def run(cur, errors): +            try: +                cur.close() +                errors.append("did not raise ProgrammingError") +                return +            except sqlite.ProgrammingError: +                return +            except: +                errors.append("raised wrong exception") + +        errors = [] +        t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) +        t.start() +        t.join() +        if len(errors) > 0: +            self.fail("\n".join(errors)) + +    def CheckCurExecute(self): +        def run(cur, errors): +            try: +                cur.execute("select name from test") +                errors.append("did not raise ProgrammingError") +                return +            except sqlite.ProgrammingError: +                return +            except: +                errors.append("raised wrong exception") + +        errors = [] +        self.cur.execute("insert into test(name) values ('a')") +        t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) +        t.start() +        t.join() +        if len(errors) > 0: +            self.fail("\n".join(errors)) + +    def CheckCurIterNext(self): +        def run(cur, errors): +            try: +                row = cur.fetchone() +                errors.append("did not raise ProgrammingError") +                return +            except sqlite.ProgrammingError: +                return +            except: +                errors.append("raised wrong exception") + +        errors = [] +        self.cur.execute("insert into test(name) values ('a')") +        self.cur.execute("select name from test") +        t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) +        t.start() +        t.join() +        if len(errors) > 0: +            self.fail("\n".join(errors)) + +class ConstructorTests(unittest.TestCase): +    def CheckDate(self): +        d = sqlite.Date(2004, 10, 28) + +    def CheckTime(self): +        t = sqlite.Time(12, 39, 35) + +    def CheckTimestamp(self): +        ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35) + +    def CheckDateFromTicks(self): +        d = sqlite.DateFromTicks(42) + +    def CheckTimeFromTicks(self): +        t = sqlite.TimeFromTicks(42) + +    def CheckTimestampFromTicks(self): +        ts = sqlite.TimestampFromTicks(42) + +    def CheckBinary(self): +        b = sqlite.Binary(chr(0) + "'") + +class ExtensionTests(unittest.TestCase): +    def CheckScriptStringSql(self): +        con = sqlite.connect(":memory:") +        cur = con.cursor() +        cur.executescript(""" +            -- bla bla +            /* a stupid comment */ +            create table a(i); +            insert into a(i) values (5); +            """) +        cur.execute("select i from a") +        res = cur.fetchone()[0] +        self.assertEqual(res, 5) + +    def CheckScriptStringUnicode(self): +        con = sqlite.connect(":memory:") +        cur = con.cursor() +        cur.executescript(u""" +            create table a(i); +            insert into a(i) values (5); +            select i from a; +            delete from a; +            insert into a(i) values (6); +            """) +        cur.execute("select i from a") +        res = cur.fetchone()[0] +        self.assertEqual(res, 6) + +    def CheckScriptSyntaxError(self): +        con = sqlite.connect(":memory:") +        cur = con.cursor() +        raised = False +        try: +            cur.executescript("create table test(x); asdf; create table test2(x)") +        except sqlite.OperationalError: +            raised = True +        self.assertEqual(raised, True, "should have raised an exception") + +    def CheckScriptErrorNormal(self): +        con = sqlite.connect(":memory:") +        cur = con.cursor() +        raised = False +        try: +            cur.executescript("create table test(sadfsadfdsa); select foo from hurz;") +        except sqlite.OperationalError: +            raised = True +        self.assertEqual(raised, True, "should have raised an exception") + +    def CheckConnectionExecute(self): +        con = sqlite.connect(":memory:") +        result = con.execute("select 5").fetchone()[0] +        self.assertEqual(result, 5, "Basic test of Connection.execute") + +    def CheckConnectionExecutemany(self): +        con = sqlite.connect(":memory:") +        con.execute("create table test(foo)") +        con.executemany("insert into test(foo) values (?)", [(3,), (4,)]) +        result = con.execute("select foo from test order by foo").fetchall() +        self.assertEqual(result[0][0], 3, "Basic test of Connection.executemany") +        self.assertEqual(result[1][0], 4, "Basic test of Connection.executemany") + +    def CheckConnectionExecutescript(self): +        con = sqlite.connect(":memory:") +        con.executescript("create table test(foo); insert into test(foo) values (5);") +        result = con.execute("select foo from test").fetchone()[0] +        self.assertEqual(result, 5, "Basic test of Connection.executescript") + +class ClosedConTests(unittest.TestCase): +    def setUp(self): +        pass + +    def tearDown(self): +        pass + +    def CheckClosedConCursor(self): +        con = sqlite.connect(":memory:") +        con.close() +        try: +            cur = con.cursor() +            self.fail("Should have raised a ProgrammingError") +        except sqlite.ProgrammingError: +            pass +        except: +            self.fail("Should have raised a ProgrammingError") + +    def CheckClosedConCommit(self): +        con = sqlite.connect(":memory:") +        con.close() +        try: +            con.commit() +            self.fail("Should have raised a ProgrammingError") +        except sqlite.ProgrammingError: +            pass +        except: +            self.fail("Should have raised a ProgrammingError") + +    def CheckClosedConRollback(self): +        con = sqlite.connect(":memory:") +        con.close() +        try: +            con.rollback() +            self.fail("Should have raised a ProgrammingError") +        except sqlite.ProgrammingError: +            pass +        except: +            self.fail("Should have raised a ProgrammingError") + +    def CheckClosedCurExecute(self): +        con = sqlite.connect(":memory:") +        cur = con.cursor() +        con.close() +        try: +            cur.execute("select 4") +            self.fail("Should have raised a ProgrammingError") +        except sqlite.ProgrammingError: +            pass +        except: +            self.fail("Should have raised a ProgrammingError") + +    def CheckClosedCreateFunction(self): +        con = sqlite.connect(":memory:") +        con.close() +        def f(x): return 17 +        try: +            con.create_function("foo", 1, f) +            self.fail("Should have raised a ProgrammingError") +        except sqlite.ProgrammingError: +            pass +        except: +            self.fail("Should have raised a ProgrammingError") + +    def CheckClosedCreateAggregate(self): +        con = sqlite.connect(":memory:") +        con.close() +        class Agg: +            def __init__(self): +                pass +            def step(self, x): +                pass +            def finalize(self): +                return 17 +        try: +            con.create_aggregate("foo", 1, Agg) +            self.fail("Should have raised a ProgrammingError") +        except sqlite.ProgrammingError: +            pass +        except: +            self.fail("Should have raised a ProgrammingError") + +    def CheckClosedSetAuthorizer(self): +        con = sqlite.connect(":memory:") +        con.close() +        def authorizer(*args): +            return sqlite.DENY +        try: +            con.set_authorizer(authorizer) +            self.fail("Should have raised a ProgrammingError") +        except sqlite.ProgrammingError: +            pass +        except: +            self.fail("Should have raised a ProgrammingError") + +    def CheckClosedSetProgressCallback(self): +        con = sqlite.connect(":memory:") +        con.close() +        def progress(): pass +        try: +            con.set_progress_handler(progress, 100) +            self.fail("Should have raised a ProgrammingError") +        except sqlite.ProgrammingError: +            pass +        except: +            self.fail("Should have raised a ProgrammingError") + +    def CheckClosedCall(self): +        con = sqlite.connect(":memory:") +        con.close() +        try: +            con() +            self.fail("Should have raised a ProgrammingError") +        except sqlite.ProgrammingError: +            pass +        except: +            self.fail("Should have raised a ProgrammingError") + +class ClosedCurTests(unittest.TestCase): +    def setUp(self): +        pass + +    def tearDown(self): +        pass + +    def CheckClosed(self): +        con = sqlite.connect(":memory:") +        cur = con.cursor() +        cur.close() + +        for method_name in ("execute", "executemany", "executescript", "fetchall", "fetchmany", "fetchone"): +            if method_name in ("execute", "executescript"): +                params = ("select 4 union select 5",) +            elif method_name == "executemany": +                params = ("insert into foo(bar) values (?)", [(3,), (4,)]) +            else: +                params = [] + +            try: +                method = getattr(cur, method_name) + +                method(*params) +                self.fail("Should have raised a ProgrammingError: method " + method_name) +            except sqlite.ProgrammingError: +                pass +            except: +                self.fail("Should have raised a ProgrammingError: " + method_name) + +def suite(): +    module_suite = unittest.makeSuite(ModuleTests, "Check") +    connection_suite = unittest.makeSuite(ConnectionTests, "Check") +    cursor_suite = unittest.makeSuite(CursorTests, "Check") +    thread_suite = unittest.makeSuite(ThreadTests, "Check") +    constructor_suite = unittest.makeSuite(ConstructorTests, "Check") +    ext_suite = unittest.makeSuite(ExtensionTests, "Check") +    closed_con_suite = unittest.makeSuite(ClosedConTests, "Check") +    closed_cur_suite = unittest.makeSuite(ClosedCurTests, "Check") +    return unittest.TestSuite((module_suite, connection_suite, cursor_suite, thread_suite, constructor_suite, ext_suite, closed_con_suite, closed_cur_suite)) + +def test(): +    runner = unittest.TextTestRunner() +    runner.run(suite()) + +if __name__ == "__main__": +    test() diff --git a/lib/test/dump.py b/lib/test/dump.py new file mode 100644 index 0000000..b2f2d26 --- /dev/null +++ b/lib/test/dump.py @@ -0,0 +1,52 @@ +# Author: Paul Kippes <kippesp@gmail.com> + +import unittest +from pysqlcipher import dbapi2 as sqlite + +class DumpTests(unittest.TestCase): +    def setUp(self): +        self.cx = sqlite.connect(":memory:") +        self.cu = self.cx.cursor() + +    def tearDown(self): +        self.cx.close() + +    def CheckTableDump(self): +        expected_sqls = [ +                "CREATE TABLE t1(id integer primary key, s1 text, " \ +                "t1_i1 integer not null, i2 integer, unique (s1), " \ +                "constraint t1_idx1 unique (i2));" +                , +                "INSERT INTO \"t1\" VALUES(1,'foo',10,20);" +                , +                "INSERT INTO \"t1\" VALUES(2,'foo2',30,30);" +                , +                "CREATE TABLE t2(id integer, t2_i1 integer, " \ +                "t2_i2 integer, primary key (id)," \ +                "foreign key(t2_i1) references t1(t1_i1));" +                , +                "CREATE TRIGGER trigger_1 update of t1_i1 on t1 " \ +                "begin " \ +                "update t2 set t2_i1 = new.t1_i1 where t2_i1 = old.t1_i1; " \ +                "end;" +                , +                "CREATE VIEW v1 as select * from t1 left join t2 " \ +                "using (id);" +                ] +        [self.cu.execute(s) for s in expected_sqls] +        i = self.cx.iterdump() +        actual_sqls = [s for s in i] +        expected_sqls = ['BEGIN TRANSACTION;'] + expected_sqls + \ +            ['COMMIT;'] +        [self.assertEqual(expected_sqls[i], actual_sqls[i]) +            for i in xrange(len(expected_sqls))] + +def suite(): +    return unittest.TestSuite(unittest.makeSuite(DumpTests, "Check")) + +def test(): +    runner = unittest.TextTestRunner() +    runner.run(suite()) + +if __name__ == "__main__": +    test() diff --git a/lib/test/factory.py b/lib/test/factory.py new file mode 100644 index 0000000..a314d99 --- /dev/null +++ b/lib/test/factory.py @@ -0,0 +1,205 @@ +#-*- coding: ISO-8859-1 -*- +# pysqlite2/test/factory.py: tests for the various factories in pysqlite +# +# Copyright (C) 2005-2007 Gerhard Häring <gh@ghaering.de> +# +# This file is part of pysqlite. +# +# This software is provided 'as-is', without any express or implied +# warranty.  In no event will the authors be held liable for any damages +# arising from the use of this software. +# +# Permission is granted to anyone to use this software for any purpose, +# including commercial applications, and to alter it and redistribute it +# freely, subject to the following restrictions: +# +# 1. The origin of this software must not be misrepresented; you must not +#    claim that you wrote the original software. If you use this software +#    in a product, an acknowledgment in the product documentation would be +#    appreciated but is not required. +# 2. Altered source versions must be plainly marked as such, and must not be +#    misrepresented as being the original software. +# 3. This notice may not be removed or altered from any source distribution. + +import unittest +import pysqlcipher.dbapi2 as sqlite + +class MyConnection(sqlite.Connection): +    def __init__(self, *args, **kwargs): +        sqlite.Connection.__init__(self, *args, **kwargs) + +def dict_factory(cursor, row): +    d = {} +    for idx, col in enumerate(cursor.description): +        d[col[0]] = row[idx] +    return d + +class MyCursor(sqlite.Cursor): +    def __init__(self, *args, **kwargs): +        sqlite.Cursor.__init__(self, *args, **kwargs) +        self.row_factory = dict_factory + +class ConnectionFactoryTests(unittest.TestCase): +    def setUp(self): +        self.con = sqlite.connect(":memory:", factory=MyConnection) + +    def tearDown(self): +        self.con.close() + +    def CheckIsInstance(self): +        self.assertTrue(isinstance(self.con, +                                   MyConnection), +                        "connection is not instance of MyConnection") + +class CursorFactoryTests(unittest.TestCase): +    def setUp(self): +        self.con = sqlite.connect(":memory:") + +    def tearDown(self): +        self.con.close() + +    def CheckIsInstance(self): +        cur = self.con.cursor(factory=MyCursor) +        self.assertTrue(isinstance(cur, +                                   MyCursor), +                        "cursor is not instance of MyCursor") + +class RowFactoryTestsBackwardsCompat(unittest.TestCase): +    def setUp(self): +        self.con = sqlite.connect(":memory:") + +    def CheckIsProducedByFactory(self): +        cur = self.con.cursor(factory=MyCursor) +        cur.execute("select 4+5 as foo") +        row = cur.fetchone() +        self.assertTrue(isinstance(row, +                                   dict), +                        "row is not instance of dict") +        cur.close() + +    def tearDown(self): +        self.con.close() + +class RowFactoryTests(unittest.TestCase): +    def setUp(self): +        self.con = sqlite.connect(":memory:") + +    def CheckCustomFactory(self): +        self.con.row_factory = lambda cur, row: list(row) +        row = self.con.execute("select 1, 2").fetchone() +        self.assertTrue(isinstance(row, +                                   list), +                        "row is not instance of list") + +    def CheckSqliteRowIndex(self): +        self.con.row_factory = sqlite.Row +        row = self.con.execute("select 1 as a, 2 as b").fetchone() +        self.assertTrue(isinstance(row, +                                   sqlite.Row), +                        "row is not instance of sqlite.Row") + +        col1, col2 = row["a"], row["b"] +        self.assertTrue(col1 == 1, "by name: wrong result for column 'a'") +        self.assertTrue(col2 == 2, "by name: wrong result for column 'a'") + +        col1, col2 = row["A"], row["B"] +        self.assertTrue(col1 == 1, "by name: wrong result for column 'A'") +        self.assertTrue(col2 == 2, "by name: wrong result for column 'B'") + +        col1, col2 = row[0], row[1] +        self.assertTrue(col1 == 1, "by index: wrong result for column 0") +        self.assertTrue(col2 == 2, "by index: wrong result for column 1") + +    def CheckSqliteRowIter(self): +        """Checks if the row object is iterable""" +        self.con.row_factory = sqlite.Row +        row = self.con.execute("select 1 as a, 2 as b").fetchone() +        for col in row: +            pass + +    def CheckSqliteRowAsTuple(self): +        """Checks if the row object can be converted to a tuple""" +        self.con.row_factory = sqlite.Row +        row = self.con.execute("select 1 as a, 2 as b").fetchone() +        t = tuple(row) + +    def CheckSqliteRowAsDict(self): +        """Checks if the row object can be correctly converted to a dictionary""" +        self.con.row_factory = sqlite.Row +        row = self.con.execute("select 1 as a, 2 as b").fetchone() +        d = dict(row) +        self.assertEqual(d["a"], row["a"]) +        self.assertEqual(d["b"], row["b"]) + +    def CheckSqliteRowHashCmp(self): +        """Checks if the row object compares and hashes correctly""" +        self.con.row_factory = sqlite.Row +        row_1 = self.con.execute("select 1 as a, 2 as b").fetchone() +        row_2 = self.con.execute("select 1 as a, 2 as b").fetchone() +        row_3 = self.con.execute("select 1 as a, 3 as b").fetchone() + +        self.assertTrue(row_1 == row_1) +        self.assertTrue(row_1 == row_2) +        self.assertTrue(row_2 != row_3) + +        self.assertFalse(row_1 != row_1) +        self.assertFalse(row_1 != row_2) +        self.assertFalse(row_2 == row_3) + +        self.assertEqual(row_1, row_2) +        self.assertEqual(hash(row_1), hash(row_2)) +        self.assertNotEqual(row_1, row_3) +        self.assertNotEqual(hash(row_1), hash(row_3)) + +    def tearDown(self): +        self.con.close() + +class TextFactoryTests(unittest.TestCase): +    def setUp(self): +        self.con = sqlite.connect(":memory:") + +    def CheckUnicode(self): +        austria = unicode("Österreich", "latin1") +        row = self.con.execute("select ?", (austria,)).fetchone() +        self.assertTrue(type(row[0]) == unicode, "type of row[0] must be unicode") + +    def CheckString(self): +        self.con.text_factory = str +        austria = unicode("Österreich", "latin1") +        row = self.con.execute("select ?", (austria,)).fetchone() +        self.assertTrue(type(row[0]) == str, "type of row[0] must be str") +        self.assertTrue(row[0] == austria.encode("utf-8"), "column must equal original data in UTF-8") + +    def CheckCustom(self): +        self.con.text_factory = lambda x: unicode(x, "utf-8", "ignore") +        austria = unicode("Österreich", "latin1") +        row = self.con.execute("select ?", (austria.encode("latin1"),)).fetchone() +        self.assertTrue(type(row[0]) == unicode, "type of row[0] must be unicode") +        self.assertTrue(row[0].endswith(u"reich"), "column must contain original data") + +    def CheckOptimizedUnicode(self): +        self.con.text_factory = sqlite.OptimizedUnicode +        austria = unicode("Österreich", "latin1") +        germany = unicode("Deutchland") +        a_row = self.con.execute("select ?", (austria,)).fetchone() +        d_row = self.con.execute("select ?", (germany,)).fetchone() +        self.assertTrue(type(a_row[0]) == unicode, "type of non-ASCII row must be unicode") +        self.assertTrue(type(d_row[0]) == str, "type of ASCII-only row must be str") + +    def tearDown(self): +        self.con.close() + +def suite(): +    connection_suite = unittest.makeSuite(ConnectionFactoryTests, "Check") +    cursor_suite = unittest.makeSuite(CursorFactoryTests, "Check") +    row_suite_compat = unittest.makeSuite(RowFactoryTestsBackwardsCompat, "Check") +    row_suite = unittest.makeSuite(RowFactoryTests, "Check") +    text_suite = unittest.makeSuite(TextFactoryTests, "Check") +    return unittest.TestSuite((connection_suite, cursor_suite, row_suite_compat, row_suite, text_suite)) + +def test(): +    runner = unittest.TextTestRunner() +    runner.run(suite()) + +if __name__ == "__main__": +    test() diff --git a/lib/test/hooks.py b/lib/test/hooks.py new file mode 100644 index 0000000..5a38060 --- /dev/null +++ b/lib/test/hooks.py @@ -0,0 +1,188 @@ +#-*- coding: ISO-8859-1 -*- +# pysqlite2/test/hooks.py: tests for various SQLite-specific hooks +# +# Copyright (C) 2006-2007 Gerhard Häring <gh@ghaering.de> +# +# This file is part of pysqlite. +# +# This software is provided 'as-is', without any express or implied +# warranty.  In no event will the authors be held liable for any damages +# arising from the use of this software. +# +# Permission is granted to anyone to use this software for any purpose, +# including commercial applications, and to alter it and redistribute it +# freely, subject to the following restrictions: +# +# 1. The origin of this software must not be misrepresented; you must not +#    claim that you wrote the original software. If you use this software +#    in a product, an acknowledgment in the product documentation would be +#    appreciated but is not required. +# 2. Altered source versions must be plainly marked as such, and must not be +#    misrepresented as being the original software. +# 3. This notice may not be removed or altered from any source distribution. + +import os, unittest +import pysqlcipher.dbapi2 as sqlite + +class CollationTests(unittest.TestCase): +    def setUp(self): +        pass + +    def tearDown(self): +        pass + +    def CheckCreateCollationNotCallable(self): +        con = sqlite.connect(":memory:") +        try: +            con.create_collation("X", 42) +            self.fail("should have raised a TypeError") +        except TypeError, e: +            self.assertEqual(e.args[0], "parameter must be callable") + +    def CheckCreateCollationNotAscii(self): +        con = sqlite.connect(":memory:") +        try: +            con.create_collation("collä", cmp) +            self.fail("should have raised a ProgrammingError") +        except sqlite.ProgrammingError, e: +            pass + +    def CheckCollationIsUsed(self): +        if sqlite.version_info < (3, 2, 1):  # old SQLite versions crash on this test +            return +        def mycoll(x, y): +            # reverse order +            return -cmp(x, y) + +        con = sqlite.connect(":memory:") +        con.create_collation("mycoll", mycoll) +        sql = """ +            select x from ( +            select 'a' as x +            union +            select 'b' as x +            union +            select 'c' as x +            ) order by x collate mycoll +            """ +        result = con.execute(sql).fetchall() +        if result[0][0] != "c" or result[1][0] != "b" or result[2][0] != "a": +            self.fail("the expected order was not returned") + +        con.create_collation("mycoll", None) +        try: +            result = con.execute(sql).fetchall() +            self.fail("should have raised an OperationalError") +        except sqlite.OperationalError, e: +            self.assertEqual(e.args[0].lower(), "no such collation sequence: mycoll") + +    def CheckCollationRegisterTwice(self): +        """ +        Register two different collation functions under the same name. +        Verify that the last one is actually used. +        """ +        con = sqlite.connect(":memory:") +        con.create_collation("mycoll", cmp) +        con.create_collation("mycoll", lambda x, y: -cmp(x, y)) +        result = con.execute(""" +            select x from (select 'a' as x union select 'b' as x) order by x collate mycoll +            """).fetchall() +        if result[0][0] != 'b' or result[1][0] != 'a': +            self.fail("wrong collation function is used") + +    def CheckDeregisterCollation(self): +        """ +        Register a collation, then deregister it. Make sure an error is raised if we try +        to use it. +        """ +        con = sqlite.connect(":memory:") +        con.create_collation("mycoll", cmp) +        con.create_collation("mycoll", None) +        try: +            con.execute("select 'a' as x union select 'b' as x order by x collate mycoll") +            self.fail("should have raised an OperationalError") +        except sqlite.OperationalError, e: +            if not e.args[0].startswith("no such collation sequence"): +                self.fail("wrong OperationalError raised") + +class ProgressTests(unittest.TestCase): +    def CheckProgressHandlerUsed(self): +        """ +        Test that the progress handler is invoked once it is set. +        """ +        con = sqlite.connect(":memory:") +        progress_calls = [] +        def progress(): +            progress_calls.append(None) +            return 0 +        con.set_progress_handler(progress, 1) +        con.execute(""" +            create table foo(a, b) +            """) +        self.assertTrue(progress_calls) + + +    def CheckOpcodeCount(self): +        """ +        Test that the opcode argument is respected. +        """ +        con = sqlite.connect(":memory:") +        progress_calls = [] +        def progress(): +            progress_calls.append(None) +            return 0 +        con.set_progress_handler(progress, 1) +        curs = con.cursor() +        curs.execute(""" +            create table foo (a, b) +            """) +        first_count = len(progress_calls) +        progress_calls = [] +        con.set_progress_handler(progress, 2) +        curs.execute(""" +            create table bar (a, b) +            """) +        second_count = len(progress_calls) +        self.assertTrue(first_count > second_count) + +    def CheckCancelOperation(self): +        """ +        Test that returning a non-zero value stops the operation in progress. +        """ +        con = sqlite.connect(":memory:") +        progress_calls = [] +        def progress(): +            progress_calls.append(None) +            return 1 +        con.set_progress_handler(progress, 1) +        curs = con.cursor() +        self.assertRaises( +            sqlite.OperationalError, +            curs.execute, +            "create table bar (a, b)") + +    def CheckClearHandler(self): +        """ +        Test that setting the progress handler to None clears the previously set handler. +        """ +        con = sqlite.connect(":memory:") +        action = 0 +        def progress(): +            action = 1 +            return 0 +        con.set_progress_handler(progress, 1) +        con.set_progress_handler(None, 1) +        con.execute("select 1 union select 2 union select 3").fetchall() +        self.assertEqual(action, 0, "progress handler was not cleared") + +def suite(): +    collation_suite = unittest.makeSuite(CollationTests, "Check") +    progress_suite = unittest.makeSuite(ProgressTests, "Check") +    return unittest.TestSuite((collation_suite, progress_suite)) + +def test(): +    runner = unittest.TextTestRunner() +    runner.run(suite()) + +if __name__ == "__main__": +    test() diff --git a/lib/test/py25/__init__.py b/lib/test/py25/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/lib/test/py25/__init__.py @@ -0,0 +1 @@ + diff --git a/lib/test/py25/py25tests.py b/lib/test/py25/py25tests.py new file mode 100644 index 0000000..ede1279 --- /dev/null +++ b/lib/test/py25/py25tests.py @@ -0,0 +1,80 @@ +#-*- coding: ISO-8859-1 -*- +# pysqlite2/test/regression.py: pysqlite regression tests +# +# Copyright (C) 2007 Gerhard Häring <gh@ghaering.de> +# +# This file is part of pysqlite. +# +# This software is provided 'as-is', without any express or implied +# warranty.  In no event will the authors be held liable for any damages +# arising from the use of this software. +# +# Permission is granted to anyone to use this software for any purpose, +# including commercial applications, and to alter it and redistribute it +# freely, subject to the following restrictions: +# +# 1. The origin of this software must not be misrepresented; you must not +#    claim that you wrote the original software. If you use this software +#    in a product, an acknowledgment in the product documentation would be +#    appreciated but is not required. +# 2. Altered source versions must be plainly marked as such, and must not be +#    misrepresented as being the original software. +# 3. This notice may not be removed or altered from any source distribution. + +from __future__ import with_statement +import unittest +import pysqlcipher.dbapi2 as sqlite + +did_rollback = False + +class MyConnection(sqlite.Connection): +    def rollback(self): +        global did_rollback +        did_rollback = True +        sqlite.Connection.rollback(self) + +class ContextTests(unittest.TestCase): +    def setUp(self): +        global did_rollback +        self.con = sqlite.connect(":memory:", factory=MyConnection) +        self.con.execute("create table test(c unique)") +        did_rollback = False + +    def tearDown(self): +        self.con.close() + +    def CheckContextManager(self): +        """Can the connection be used as a context manager at all?""" +        with self.con: +            pass + +    def CheckContextManagerCommit(self): +        """Is a commit called in the context manager?""" +        with self.con: +            self.con.execute("insert into test(c) values ('foo')") +        self.con.rollback() +        count = self.con.execute("select count(*) from test").fetchone()[0] +        self.assertEqual(count, 1) + +    def CheckContextManagerRollback(self): +        """Is a rollback called in the context manager?""" +        global did_rollback +        self.assertEqual(did_rollback, False) +        try: +            with self.con: +                self.con.execute("insert into test(c) values (4)") +                self.con.execute("insert into test(c) values (4)") +        except sqlite.IntegrityError: +            pass +        self.assertEqual(did_rollback, True) + +def suite(): +    ctx_suite = unittest.makeSuite(ContextTests, "Check") +    return unittest.TestSuite((ctx_suite,)) + +def test(): +    runner = unittest.TextTestRunner() +    runner.run(suite()) + +if __name__ == "__main__": +    test() diff --git a/lib/test/regression.py b/lib/test/regression.py new file mode 100644 index 0000000..4003742 --- /dev/null +++ b/lib/test/regression.py @@ -0,0 +1,271 @@ +#-*- coding: ISO-8859-1 -*- +# pysqlite2/test/regression.py: pysqlite regression tests +# +# Copyright (C) 2006-2009 Gerhard Häring <gh@ghaering.de> +# +# This file is part of pysqlite. +# +# This software is provided 'as-is', without any express or implied +# warranty.  In no event will the authors be held liable for any damages +# arising from the use of this software. +# +# Permission is granted to anyone to use this software for any purpose, +# including commercial applications, and to alter it and redistribute it +# freely, subject to the following restrictions: +# +# 1. The origin of this software must not be misrepresented; you must not +#    claim that you wrote the original software. If you use this software +#    in a product, an acknowledgment in the product documentation would be +#    appreciated but is not required. +# 2. Altered source versions must be plainly marked as such, and must not be +#    misrepresented as being the original software. +# 3. This notice may not be removed or altered from any source distribution. + +import datetime +import unittest +import pysqlcipher.dbapi2 as sqlite + +class RegressionTests(unittest.TestCase): +    def setUp(self): +        self.con = sqlite.connect(":memory:") + +    def tearDown(self): +        self.con.close() + +    def CheckPragmaUserVersion(self): +        # This used to crash pysqlite because this pragma command returns NULL for the column name +        cur = self.con.cursor() +        cur.execute("pragma user_version") + +    def CheckPragmaSchemaVersion(self): +        # This still crashed pysqlite <= 2.2.1 +        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES) +        try: +            cur = self.con.cursor() +            cur.execute("pragma schema_version") +        finally: +            cur.close() +            con.close() + +    def CheckStatementReset(self): +        # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are +        # reset before a rollback, but only those that are still in the +        # statement cache. The others are not accessible from the connection object. +        con = sqlite.connect(":memory:", cached_statements=5) +        cursors = [con.cursor() for x in xrange(5)] +        cursors[0].execute("create table test(x)") +        for i in range(10): +            cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in xrange(10)]) + +        for i in range(5): +            cursors[i].execute(" " * i + "select x from test") + +        con.rollback() + +    def CheckColumnNameWithSpaces(self): +        cur = self.con.cursor() +        cur.execute('select 1 as "foo bar [datetime]"') +        self.assertEqual(cur.description[0][0], "foo bar") + +        cur.execute('select 1 as "foo baz"') +        self.assertEqual(cur.description[0][0], "foo baz") + +    def CheckStatementFinalizationOnCloseDb(self): +        # pysqlite versions <= 2.3.3 only finalized statements in the statement +        # cache when closing the database. statements that were still +        # referenced in cursors weren't closed an could provoke " +        # "OperationalError: Unable to close due to unfinalised statements". +        con = sqlite.connect(":memory:") +        cursors = [] +        # default statement cache size is 100 +        for i in range(105): +            cur = con.cursor() +            cursors.append(cur) +            cur.execute("select 1 x union select " + str(i)) +        con.close() + +    def CheckOnConflictRollback(self): +        if sqlite.sqlite_version_info < (3, 2, 2): +            return +        con = sqlite.connect(":memory:") +        con.execute("create table foo(x, unique(x) on conflict rollback)") +        con.execute("insert into foo(x) values (1)") +        try: +            con.execute("insert into foo(x) values (1)") +        except sqlite.DatabaseError: +            pass +        con.execute("insert into foo(x) values (2)") +        try: +            con.commit() +        except sqlite.OperationalError: +            self.fail("pysqlite knew nothing about the implicit ROLLBACK") + +    def CheckWorkaroundForBuggySqliteTransferBindings(self): +        """ +        pysqlite would crash with older SQLite versions unless +        a workaround is implemented. +        """ +        self.con.execute("create table foo(bar)") +        self.con.execute("drop table foo") +        self.con.execute("create table foo(bar)") + +    def CheckEmptyStatement(self): +        """ +        pysqlite used to segfault with SQLite versions 3.5.x. These return NULL +        for "no-operation" statements +        """ +        self.con.execute("") + +    def CheckUnicodeConnect(self): +        """ +        With pysqlite 2.4.0 you needed to use a string or a APSW connection +        object for opening database connections. + +        Formerly, both bytestrings and unicode strings used to work. + +        Let's make sure unicode strings work in the future. +        """ +        con = sqlite.connect(u":memory:") +        con.close() + +    def CheckTypeMapUsage(self): +        """ +        pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling +        a statement. This test exhibits the problem. +        """ +        SELECT = "select * from foo" +        con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES) +        con.execute("create table foo(bar timestamp)") +        con.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),)) +        con.execute(SELECT) +        con.execute("drop table foo") +        con.execute("create table foo(bar integer)") +        con.execute("insert into foo(bar) values (5)") +        con.execute(SELECT) + +    def CheckRegisterAdapter(self): +        """ +        See issue 3312. +        """ +        self.assertRaises(TypeError, sqlite.register_adapter, {}, None) + +    def CheckSetIsolationLevel(self): +        """ +        See issue 3312. +        """ +        con = sqlite.connect(":memory:") +        self.assertRaises(UnicodeEncodeError, setattr, con, +                          "isolation_level", u"\xe9") + +    def CheckCursorConstructorCallCheck(self): +        """ +        Verifies that cursor methods check wether base class __init__ was called. +        """ +        class Cursor(sqlite.Cursor): +            def __init__(self, con): +                pass + +        con = sqlite.connect(":memory:") +        cur = Cursor(con) +        try: +            cur.execute("select 4+5").fetchall() +            self.fail("should have raised ProgrammingError") +        except sqlite.ProgrammingError: +            pass +        except: +            self.fail("should have raised ProgrammingError") + +    def CheckConnectionConstructorCallCheck(self): +        """ +        Verifies that connection methods check wether base class __init__ was called. +        """ +        class Connection(sqlite.Connection): +            def __init__(self, name): +                pass + +        con = Connection(":memory:") +        try: +            cur = con.cursor() +            self.fail("should have raised ProgrammingError") +        except sqlite.ProgrammingError: +            pass +        except: +            self.fail("should have raised ProgrammingError") + +    def CheckCursorRegistration(self): +        """ +        Verifies that subclassed cursor classes are correctly registered with +        the connection object, too.  (fetch-across-rollback problem) +        """ +        class Connection(sqlite.Connection): +            def cursor(self): +                return Cursor(self) + +        class Cursor(sqlite.Cursor): +            def __init__(self, con): +                sqlite.Cursor.__init__(self, con) + +        con = Connection(":memory:") +        cur = con.cursor() +        cur.execute("create table foo(x)") +        cur.executemany("insert into foo(x) values (?)", [(3,), (4,), (5,)]) +        cur.execute("select x from foo") +        con.rollback() +        try: +            cur.fetchall() +            self.fail("should have raised InterfaceError") +        except sqlite.InterfaceError: +            pass +        except: +            self.fail("should have raised InterfaceError") + +    def CheckAutoCommit(self): +        """ +        Verifies that creating a connection in autocommit mode works. +        2.5.3 introduced a regression so that these could no longer +        be created. +        """ +        con = sqlite.connect(":memory:", isolation_level=None) + +    def CheckPragmaAutocommit(self): +        """ +        Verifies that running a PRAGMA statement that does an autocommit does +        work. This did not work in 2.5.3/2.5.4. +        """ +        con = sqlite.connect(":memory:") +        cur = con.cursor() +        cur.execute("create table foo(bar)") +        cur.execute("insert into foo(bar) values (5)") + +        cur.execute("pragma page_size") +        row = cur.fetchone() + +    def CheckSetDict(self): +        """ +        See http://bugs.python.org/issue7478 + +        It was possible to successfully register callbacks that could not be +        hashed. Return codes of PyDict_SetItem were not checked properly. +        """ +        class NotHashable: +            def __call__(self, *args, **kw): +                pass +            def __hash__(self): +                raise TypeError() +        var = NotHashable() +        con = sqlite.connect(":memory:") +        self.assertRaises(TypeError, con.create_function, var) +        self.assertRaises(TypeError, con.create_aggregate, var) +        self.assertRaises(TypeError, con.set_authorizer, var) +        self.assertRaises(TypeError, con.set_progress_handler, var) + +def suite(): +    regression_suite = unittest.makeSuite(RegressionTests, "Check") +    return unittest.TestSuite((regression_suite,)) + +def test(): +    runner = unittest.TextTestRunner() +    runner.run(suite()) + +if __name__ == "__main__": +    test() diff --git a/lib/test/transactions.py b/lib/test/transactions.py new file mode 100644 index 0000000..4e3aa08 --- /dev/null +++ b/lib/test/transactions.py @@ -0,0 +1,205 @@ +#-*- coding: ISO-8859-1 -*- +# pysqlite2/test/transactions.py: tests transactions +# +# Copyright (C) 2005-2009 Gerhard Häring <gh@ghaering.de> +# +# This file is part of pysqlite. +# +# This software is provided 'as-is', without any express or implied +# warranty.  In no event will the authors be held liable for any damages +# arising from the use of this software. +# +# Permission is granted to anyone to use this software for any purpose, +# including commercial applications, and to alter it and redistribute it +# freely, subject to the following restrictions: +# +# 1. The origin of this software must not be misrepresented; you must not +#    claim that you wrote the original software. If you use this software +#    in a product, an acknowledgment in the product documentation would be +#    appreciated but is not required. +# 2. Altered source versions must be plainly marked as such, and must not be +#    misrepresented as being the original software. +# 3. This notice may not be removed or altered from any source distribution. + +import sys +import os, unittest +import pysqlcipher.dbapi2 as sqlite + +def get_db_path(): +    return "sqlite_testdb" + +class TransactionTests(unittest.TestCase): +    def setUp(self): +        try: +            os.remove(get_db_path()) +        except OSError: +            pass + +        self.con1 = sqlite.connect(get_db_path(), timeout=0.1) +        self.cur1 = self.con1.cursor() + +        self.con2 = sqlite.connect(get_db_path(), timeout=0.1) +        self.cur2 = self.con2.cursor() + +    def tearDown(self): +        self.cur1.close() +        self.con1.close() + +        self.cur2.close() +        self.con2.close() + +        try: +            os.unlink(get_db_path()) +        except OSError: +            pass + +    def CheckDMLdoesAutoCommitBefore(self): +        self.cur1.execute("create table test(i)") +        self.cur1.execute("insert into test(i) values (5)") +        self.cur1.execute("create table test2(j)") +        self.cur2.execute("select i from test") +        res = self.cur2.fetchall() +        self.assertEqual(len(res), 1) + +    def CheckInsertStartsTransaction(self): +        self.cur1.execute("create table test(i)") +        self.cur1.execute("insert into test(i) values (5)") +        self.cur2.execute("select i from test") +        res = self.cur2.fetchall() +        self.assertEqual(len(res), 0) + +    def CheckUpdateStartsTransaction(self): +        self.cur1.execute("create table test(i)") +        self.cur1.execute("insert into test(i) values (5)") +        self.con1.commit() +        self.cur1.execute("update test set i=6") +        self.cur2.execute("select i from test") +        res = self.cur2.fetchone()[0] +        self.assertEqual(res, 5) + +    def CheckDeleteStartsTransaction(self): +        self.cur1.execute("create table test(i)") +        self.cur1.execute("insert into test(i) values (5)") +        self.con1.commit() +        self.cur1.execute("delete from test") +        self.cur2.execute("select i from test") +        res = self.cur2.fetchall() +        self.assertEqual(len(res), 1) + +    def CheckReplaceStartsTransaction(self): +        self.cur1.execute("create table test(i)") +        self.cur1.execute("insert into test(i) values (5)") +        self.con1.commit() +        self.cur1.execute("replace into test(i) values (6)") +        self.cur2.execute("select i from test") +        res = self.cur2.fetchall() +        self.assertEqual(len(res), 1) +        self.assertEqual(res[0][0], 5) + +    def CheckToggleAutoCommit(self): +        self.cur1.execute("create table test(i)") +        self.cur1.execute("insert into test(i) values (5)") +        self.con1.isolation_level = None +        self.assertEqual(self.con1.isolation_level, None) +        self.cur2.execute("select i from test") +        res = self.cur2.fetchall() +        self.assertEqual(len(res), 1) + +        self.con1.isolation_level = "DEFERRED" +        self.assertEqual(self.con1.isolation_level , "DEFERRED") +        self.cur1.execute("insert into test(i) values (5)") +        self.cur2.execute("select i from test") +        res = self.cur2.fetchall() +        self.assertEqual(len(res), 1) + +    def CheckRaiseTimeout(self): +        if sqlite.sqlite_version_info < (3, 2, 2): +            # This will fail (hang) on earlier versions of sqlite. +            # Determine exact version it was fixed. 3.2.1 hangs. +            return +        self.cur1.execute("create table test(i)") +        self.cur1.execute("insert into test(i) values (5)") +        try: +            self.cur2.execute("insert into test(i) values (5)") +            self.fail("should have raised an OperationalError") +        except sqlite.OperationalError: +            pass +        except: +            self.fail("should have raised an OperationalError") + +    def CheckLocking(self): +        """ +        This tests the improved concurrency with pysqlite 2.3.4. You needed +        to roll back con2 before you could commit con1. +        """ +        if sqlite.sqlite_version_info < (3, 2, 2): +            # This will fail (hang) on earlier versions of sqlite. +            # Determine exact version it was fixed. 3.2.1 hangs. +            return +        self.cur1.execute("create table test(i)") +        self.cur1.execute("insert into test(i) values (5)") +        try: +            self.cur2.execute("insert into test(i) values (5)") +            self.fail("should have raised an OperationalError") +        except sqlite.OperationalError: +            pass +        except: +            self.fail("should have raised an OperationalError") +        # NO self.con2.rollback() HERE!!! +        self.con1.commit() + +    def CheckRollbackCursorConsistency(self): +        """ +        Checks if cursors on the connection are set into a "reset" state +        when a rollback is done on the connection. +        """ +        con = sqlite.connect(":memory:") +        cur = con.cursor() +        cur.execute("create table test(x)") +        cur.execute("insert into test(x) values (5)") +        cur.execute("select 1 union select 2 union select 3") + +        con.rollback() +        try: +            cur.fetchall() +            self.fail("InterfaceError should have been raised") +        except sqlite.InterfaceError, e: +            pass +        except: +            self.fail("InterfaceError should have been raised") + +class SpecialCommandTests(unittest.TestCase): +    def setUp(self): +        self.con = sqlite.connect(":memory:") +        self.cur = self.con.cursor() + +    def CheckVacuum(self): +        self.cur.execute("create table test(i)") +        self.cur.execute("insert into test(i) values (5)") +        self.cur.execute("vacuum") + +    def CheckDropTable(self): +        self.cur.execute("create table test(i)") +        self.cur.execute("insert into test(i) values (5)") +        self.cur.execute("drop table test") + +    def CheckPragma(self): +        self.cur.execute("create table test(i)") +        self.cur.execute("insert into test(i) values (5)") +        self.cur.execute("pragma count_changes=1") + +    def tearDown(self): +        self.cur.close() +        self.con.close() + +def suite(): +    default_suite = unittest.makeSuite(TransactionTests, "Check") +    special_command_suite = unittest.makeSuite(SpecialCommandTests, "Check") +    return unittest.TestSuite((default_suite, special_command_suite)) + +def test(): +    runner = unittest.TextTestRunner() +    runner.run(suite()) + +if __name__ == "__main__": +    test() diff --git a/lib/test/types.py b/lib/test/types.py new file mode 100644 index 0000000..fc757be --- /dev/null +++ b/lib/test/types.py @@ -0,0 +1,414 @@ +#-*- coding: ISO-8859-1 -*- +# pysqlite2/test/types.py: tests for type conversion and detection +# +# Copyright (C) 2005-2007 Gerhard Häring <gh@ghaering.de> +# +# This file is part of pysqlite. +# +# This software is provided 'as-is', without any express or implied +# warranty.  In no event will the authors be held liable for any damages +# arising from the use of this software. +# +# Permission is granted to anyone to use this software for any purpose, +# including commercial applications, and to alter it and redistribute it +# freely, subject to the following restrictions: +# +# 1. The origin of this software must not be misrepresented; you must not +#    claim that you wrote the original software. If you use this software +#    in a product, an acknowledgment in the product documentation would be +#    appreciated but is not required. +# 2. Altered source versions must be plainly marked as such, and must not be +#    misrepresented as being the original software. +# 3. This notice may not be removed or altered from any source distribution. + +import datetime +import unittest +import pysqlcipher.dbapi2 as sqlite +import zlib + + +class SqliteTypeTests(unittest.TestCase): +    def setUp(self): +        self.con = sqlite.connect(":memory:") +        self.cur = self.con.cursor() +        self.cur.execute("create table test(i integer, s varchar, f number, b blob)") + +    def tearDown(self): +        self.cur.close() +        self.con.close() + +    def CheckString(self): +        self.cur.execute("insert into test(s) values (?)", (u"Österreich",)) +        self.cur.execute("select s from test") +        row = self.cur.fetchone() +        self.assertEqual(row[0], u"Österreich") + +    def CheckSmallInt(self): +        self.cur.execute("insert into test(i) values (?)", (42,)) +        self.cur.execute("select i from test") +        row = self.cur.fetchone() +        self.assertEqual(row[0], 42) + +    def CheckLargeInt(self): +        num = 2**40 +        self.cur.execute("insert into test(i) values (?)", (num,)) +        self.cur.execute("select i from test") +        row = self.cur.fetchone() +        self.assertEqual(row[0], num) + +    def CheckFloat(self): +        val = 3.14 +        self.cur.execute("insert into test(f) values (?)", (val,)) +        self.cur.execute("select f from test") +        row = self.cur.fetchone() +        self.assertEqual(row[0], val) + +    def CheckBlob(self): +        val = buffer("Guglhupf") +        self.cur.execute("insert into test(b) values (?)", (val,)) +        self.cur.execute("select b from test") +        row = self.cur.fetchone() +        self.assertEqual(row[0], val) + +    def CheckUnicodeExecute(self): +        self.cur.execute(u"select 'Österreich'") +        row = self.cur.fetchone() +        self.assertEqual(row[0], u"Österreich") + +    def CheckNonUtf8_Default(self): +        try: +            self.cur.execute("select ?", (chr(150),)) +            self.fail("should have raised a ProgrammingError") +        except sqlite.ProgrammingError: +            pass + +    def CheckNonUtf8_TextFactoryString(self): +        orig_text_factory = self.con.text_factory +        try: +            self.con.text_factory = str +            self.cur.execute("select ?", (chr(150),)) +        finally: +            self.con.text_factory = orig_text_factory + +    def CheckNonUtf8_TextFactoryOptimizedUnicode(self): +        orig_text_factory = self.con.text_factory +        try: +            try: +                self.con.text_factory = sqlite.OptimizedUnicode +                self.cur.execute("select ?", (chr(150),)) +                self.fail("should have raised a ProgrammingError") +            except sqlite.ProgrammingError: +                pass +        finally: +            self.con.text_factory = orig_text_factory + +class DeclTypesTests(unittest.TestCase): +    class Foo: +        def __init__(self, _val): +            self.val = _val + +        def __cmp__(self, other): +            if not isinstance(other, DeclTypesTests.Foo): +                raise ValueError +            if self.val == other.val: +                return 0 +            else: +                return 1 + +        def __conform__(self, protocol): +            if protocol is sqlite.PrepareProtocol: +                return self.val +            else: +                return None + +        def __str__(self): +            return "<%s>" % self.val + +    def setUp(self): +        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES) +        self.cur = self.con.cursor() +        self.cur.execute("create table test(i int, s str, f float, b bool, u unicode, foo foo, bin blob, n1 number, n2 number(5))") + +        # override float, make them always return the same number +        sqlite.converters["FLOAT"] = lambda x: 47.2 + +        # and implement two custom ones +        sqlite.converters["BOOL"] = lambda x: bool(int(x)) +        sqlite.converters["FOO"] = DeclTypesTests.Foo +        sqlite.converters["WRONG"] = lambda x: "WRONG" +        sqlite.converters["NUMBER"] = float + +    def tearDown(self): +        del sqlite.converters["FLOAT"] +        del sqlite.converters["BOOL"] +        del sqlite.converters["FOO"] +        del sqlite.converters["NUMBER"] +        self.cur.close() +        self.con.close() + +    def CheckString(self): +        # default +        self.cur.execute("insert into test(s) values (?)", ("foo",)) +        self.cur.execute('select s as "s [WRONG]" from test') +        row = self.cur.fetchone() +        self.assertEqual(row[0], "foo") + +    def CheckSmallInt(self): +        # default +        self.cur.execute("insert into test(i) values (?)", (42,)) +        self.cur.execute("select i from test") +        row = self.cur.fetchone() +        self.assertEqual(row[0], 42) + +    def CheckLargeInt(self): +        # default +        num = 2**40 +        self.cur.execute("insert into test(i) values (?)", (num,)) +        self.cur.execute("select i from test") +        row = self.cur.fetchone() +        self.assertEqual(row[0], num) + +    def CheckFloat(self): +        # custom +        val = 3.14 +        self.cur.execute("insert into test(f) values (?)", (val,)) +        self.cur.execute("select f from test") +        row = self.cur.fetchone() +        self.assertEqual(row[0], 47.2) + +    def CheckBool(self): +        # custom +        self.cur.execute("insert into test(b) values (?)", (False,)) +        self.cur.execute("select b from test") +        row = self.cur.fetchone() +        self.assertEqual(row[0], False) + +        self.cur.execute("delete from test") +        self.cur.execute("insert into test(b) values (?)", (True,)) +        self.cur.execute("select b from test") +        row = self.cur.fetchone() +        self.assertEqual(row[0], True) + +    def CheckUnicode(self): +        # default +        val = u"\xd6sterreich" +        self.cur.execute("insert into test(u) values (?)", (val,)) +        self.cur.execute("select u from test") +        row = self.cur.fetchone() +        self.assertEqual(row[0], val) + +    def CheckFoo(self): +        val = DeclTypesTests.Foo("bla") +        self.cur.execute("insert into test(foo) values (?)", (val,)) +        self.cur.execute("select foo from test") +        row = self.cur.fetchone() +        self.assertEqual(row[0], val) + +    def CheckUnsupportedSeq(self): +        class Bar: pass +        val = Bar() +        try: +            self.cur.execute("insert into test(f) values (?)", (val,)) +            self.fail("should have raised an InterfaceError") +        except sqlite.InterfaceError: +            pass +        except: +            self.fail("should have raised an InterfaceError") + +    def CheckUnsupportedDict(self): +        class Bar: pass +        val = Bar() +        try: +            self.cur.execute("insert into test(f) values (:val)", {"val": val}) +            self.fail("should have raised an InterfaceError") +        except sqlite.InterfaceError: +            pass +        except: +            self.fail("should have raised an InterfaceError") + +    def CheckBlob(self): +        # default +        val = buffer("Guglhupf") +        self.cur.execute("insert into test(bin) values (?)", (val,)) +        self.cur.execute("select bin from test") +        row = self.cur.fetchone() +        self.assertEqual(row[0], val) + +    def CheckNumber1(self): +        self.cur.execute("insert into test(n1) values (5)") +        value = self.cur.execute("select n1 from test").fetchone()[0] +        # if the converter is not used, it's an int instead of a float +        self.assertEqual(type(value), float) + +    def CheckNumber2(self): +        """Checks wether converter names are cut off at '(' characters""" +        self.cur.execute("insert into test(n2) values (5)") +        value = self.cur.execute("select n2 from test").fetchone()[0] +        # if the converter is not used, it's an int instead of a float +        self.assertEqual(type(value), float) + +class ColNamesTests(unittest.TestCase): +    def setUp(self): +        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES) +        self.cur = self.con.cursor() +        self.cur.execute("create table test(x foo)") + +        sqlite.converters["FOO"] = lambda x: "[%s]" % x +        sqlite.converters["BAR"] = lambda x: "<%s>" % x +        sqlite.converters["EXC"] = lambda x: 5 // 0 +        sqlite.converters["B1B1"] = lambda x: "MARKER" + +    def tearDown(self): +        del sqlite.converters["FOO"] +        del sqlite.converters["BAR"] +        del sqlite.converters["EXC"] +        del sqlite.converters["B1B1"] +        self.cur.close() +        self.con.close() + +    def CheckDeclTypeNotUsed(self): +        """ +        Assures that the declared type is not used when PARSE_DECLTYPES +        is not set. +        """ +        self.cur.execute("insert into test(x) values (?)", ("xxx",)) +        self.cur.execute("select x from test") +        val = self.cur.fetchone()[0] +        self.assertEqual(val, "xxx") + +    def CheckNone(self): +        self.cur.execute("insert into test(x) values (?)", (None,)) +        self.cur.execute("select x from test") +        val = self.cur.fetchone()[0] +        self.assertEqual(val, None) + +    def CheckColName(self): +        self.cur.execute("insert into test(x) values (?)", ("xxx",)) +        self.cur.execute('select x as "x [bar]" from test') +        val = self.cur.fetchone()[0] +        self.assertEqual(val, "<xxx>") + +        # Check if the stripping of colnames works. Everything after the first +        # whitespace should be stripped. +        self.assertEqual(self.cur.description[0][0], "x") + +    def CheckCaseInConverterName(self): +        self.cur.execute("""select 'other' as "x [b1b1]\"""") +        val = self.cur.fetchone()[0] +        self.assertEqual(val, "MARKER") + +    def CheckCursorDescriptionNoRow(self): +        """ +        cursor.description should at least provide the column name(s), even if +        no row returned. +        """ +        self.cur.execute("select * from test where 0 = 1") +        self.assert_(self.cur.description[0][0] == "x") + +class ObjectAdaptationTests(unittest.TestCase): +    def cast(obj): +        return float(obj) +    cast = staticmethod(cast) + +    def setUp(self): +        self.con = sqlite.connect(":memory:") +        try: +            del sqlite.adapters[int] +        except: +            pass +        sqlite.register_adapter(int, ObjectAdaptationTests.cast) +        self.cur = self.con.cursor() + +    def tearDown(self): +        del sqlite.adapters[(int, sqlite.PrepareProtocol)] +        self.cur.close() +        self.con.close() + +    def CheckCasterIsUsed(self): +        self.cur.execute("select ?", (4,)) +        val = self.cur.fetchone()[0] +        self.assertEqual(type(val), float) + +class BinaryConverterTests(unittest.TestCase): +    def convert(s): +        return zlib.decompress(s) +    convert = staticmethod(convert) + +    def setUp(self): +        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES) +        sqlite.register_converter("bin", BinaryConverterTests.convert) + +    def tearDown(self): +        self.con.close() + +    def CheckBinaryInputForConverter(self): +        testdata = "abcdefg" * 10 +        result = self.con.execute('select ? as "x [bin]"', (buffer(zlib.compress(testdata)),)).fetchone()[0] +        self.assertEqual(testdata, result) + +class DateTimeTests(unittest.TestCase): +    def setUp(self): +        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES) +        self.cur = self.con.cursor() +        self.cur.execute("create table test(d date, ts timestamp)") + +    def tearDown(self): +        self.cur.close() +        self.con.close() + +    def CheckSqliteDate(self): +        d = sqlite.Date(2004, 2, 14) +        self.cur.execute("insert into test(d) values (?)", (d,)) +        self.cur.execute("select d from test") +        d2 = self.cur.fetchone()[0] +        self.assertEqual(d, d2) + +    def CheckSqliteTimestamp(self): +        ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0) +        self.cur.execute("insert into test(ts) values (?)", (ts,)) +        self.cur.execute("select ts from test") +        ts2 = self.cur.fetchone()[0] +        self.assertEqual(ts, ts2) + +    def CheckSqlTimestamp(self): +        # The date functions are only available in SQLite version 3.1 or later +        if sqlite.sqlite_version_info < (3, 1): +            return + +        # SQLite's current_timestamp uses UTC time, while datetime.datetime.now() uses local time. +        now = datetime.datetime.now() +        self.cur.execute("insert into test(ts) values (current_timestamp)") +        self.cur.execute("select ts from test") +        ts = self.cur.fetchone()[0] +        self.assertEqual(type(ts), datetime.datetime) +        self.assertEqual(ts.year, now.year) + +    def CheckDateTimeSubSeconds(self): +        ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 500000) +        self.cur.execute("insert into test(ts) values (?)", (ts,)) +        self.cur.execute("select ts from test") +        ts2 = self.cur.fetchone()[0] +        self.assertEqual(ts, ts2) + +    def CheckDateTimeSubSecondsFloatingPoint(self): +        ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 510241) +        self.cur.execute("insert into test(ts) values (?)", (ts,)) +        self.cur.execute("select ts from test") +        ts2 = self.cur.fetchone()[0] +        self.assertEqual(ts, ts2) + +def suite(): +    sqlite_type_suite = unittest.makeSuite(SqliteTypeTests, "Check") +    decltypes_type_suite = unittest.makeSuite(DeclTypesTests, "Check") +    colnames_type_suite = unittest.makeSuite(ColNamesTests, "Check") +    adaptation_suite = unittest.makeSuite(ObjectAdaptationTests, "Check") +    bin_suite = unittest.makeSuite(BinaryConverterTests, "Check") +    date_suite = unittest.makeSuite(DateTimeTests, "Check") +    return unittest.TestSuite((sqlite_type_suite, decltypes_type_suite, colnames_type_suite, adaptation_suite, bin_suite, date_suite)) + +def test(): +    runner = unittest.TextTestRunner() +    runner.run(suite()) + +if __name__ == "__main__": +    test() diff --git a/lib/test/userfunctions.py b/lib/test/userfunctions.py new file mode 100644 index 0000000..6b60f60 --- /dev/null +++ b/lib/test/userfunctions.py @@ -0,0 +1,413 @@ +#-*- coding: ISO-8859-1 -*- +# pysqlite2/test/userfunctions.py: tests for user-defined functions and +#                                  aggregates. +# +# Copyright (C) 2005-2007 Gerhard Häring <gh@ghaering.de> +# +# This file is part of pysqlite. +# +# This software is provided 'as-is', without any express or implied +# warranty.  In no event will the authors be held liable for any damages +# arising from the use of this software. +# +# Permission is granted to anyone to use this software for any purpose, +# including commercial applications, and to alter it and redistribute it +# freely, subject to the following restrictions: +# +# 1. The origin of this software must not be misrepresented; you must not +#    claim that you wrote the original software. If you use this software +#    in a product, an acknowledgment in the product documentation would be +#    appreciated but is not required. +# 2. Altered source versions must be plainly marked as such, and must not be +#    misrepresented as being the original software. +# 3. This notice may not be removed or altered from any source distribution. + +import unittest +import pysqlcipher.dbapi2 as sqlite + +def func_returntext(): +    return "foo" +def func_returnunicode(): +    return u"bar" +def func_returnint(): +    return 42 +def func_returnfloat(): +    return 3.14 +def func_returnnull(): +    return None +def func_returnblob(): +    return buffer("blob") +def func_raiseexception(): +    5 // 0 + +def func_isstring(v): +    return type(v) is unicode +def func_isint(v): +    return type(v) is int +def func_isfloat(v): +    return type(v) is float +def func_isnone(v): +    return type(v) is type(None) +def func_isblob(v): +    return type(v) is buffer + +class AggrNoStep: +    def __init__(self): +        pass + +    def finalize(self): +        return 1 + +class AggrNoFinalize: +    def __init__(self): +        pass + +    def step(self, x): +        pass + +class AggrExceptionInInit: +    def __init__(self): +        5 // 0 + +    def step(self, x): +        pass + +    def finalize(self): +        pass + +class AggrExceptionInStep: +    def __init__(self): +        pass + +    def step(self, x): +        5 // 0 + +    def finalize(self): +        return 42 + +class AggrExceptionInFinalize: +    def __init__(self): +        pass + +    def step(self, x): +        pass + +    def finalize(self): +        5 // 0 + +class AggrCheckType: +    def __init__(self): +        self.val = None + +    def step(self, whichType, val): +        theType = {"str": unicode, "int": int, "float": float, "None": type(None), "blob": buffer} +        self.val = int(theType[whichType] is type(val)) + +    def finalize(self): +        return self.val + +class AggrSum: +    def __init__(self): +        self.val = 0.0 + +    def step(self, val): +        self.val += val + +    def finalize(self): +        return self.val + +class FunctionTests(unittest.TestCase): +    def setUp(self): +        self.con = sqlite.connect(":memory:") + +        self.con.create_function("returntext", 0, func_returntext) +        self.con.create_function("returnunicode", 0, func_returnunicode) +        self.con.create_function("returnint", 0, func_returnint) +        self.con.create_function("returnfloat", 0, func_returnfloat) +        self.con.create_function("returnnull", 0, func_returnnull) +        self.con.create_function("returnblob", 0, func_returnblob) +        self.con.create_function("raiseexception", 0, func_raiseexception) + +        self.con.create_function("isstring", 1, func_isstring) +        self.con.create_function("isint", 1, func_isint) +        self.con.create_function("isfloat", 1, func_isfloat) +        self.con.create_function("isnone", 1, func_isnone) +        self.con.create_function("isblob", 1, func_isblob) + +    def tearDown(self): +        self.con.close() + +    def CheckFuncErrorOnCreate(self): +        try: +            self.con.create_function("bla", -100, lambda x: 2*x) +            self.fail("should have raised an OperationalError") +        except sqlite.OperationalError: +            pass + +    def CheckFuncRefCount(self): +        def getfunc(): +            def f(): +                return 1 +            return f +        f = getfunc() +        globals()["foo"] = f +        # self.con.create_function("reftest", 0, getfunc()) +        self.con.create_function("reftest", 0, f) +        cur = self.con.cursor() +        cur.execute("select reftest()") + +    def CheckFuncReturnText(self): +        cur = self.con.cursor() +        cur.execute("select returntext()") +        val = cur.fetchone()[0] +        self.assertEqual(type(val), unicode) +        self.assertEqual(val, "foo") + +    def CheckFuncReturnUnicode(self): +        cur = self.con.cursor() +        cur.execute("select returnunicode()") +        val = cur.fetchone()[0] +        self.assertEqual(type(val), unicode) +        self.assertEqual(val, u"bar") + +    def CheckFuncReturnInt(self): +        cur = self.con.cursor() +        cur.execute("select returnint()") +        val = cur.fetchone()[0] +        self.assertEqual(type(val), int) +        self.assertEqual(val, 42) + +    def CheckFuncReturnFloat(self): +        cur = self.con.cursor() +        cur.execute("select returnfloat()") +        val = cur.fetchone()[0] +        self.assertEqual(type(val), float) +        if val < 3.139 or val > 3.141: +            self.fail("wrong value") + +    def CheckFuncReturnNull(self): +        cur = self.con.cursor() +        cur.execute("select returnnull()") +        val = cur.fetchone()[0] +        self.assertEqual(type(val), type(None)) +        self.assertEqual(val, None) + +    def CheckFuncReturnBlob(self): +        cur = self.con.cursor() +        cur.execute("select returnblob()") +        val = cur.fetchone()[0] +        self.assertEqual(type(val), buffer) +        self.assertEqual(val, buffer("blob")) + +    def CheckFuncException(self): +        cur = self.con.cursor() +        try: +            cur.execute("select raiseexception()") +            cur.fetchone() +            self.fail("should have raised OperationalError") +        except sqlite.OperationalError, e: +            self.assertEqual(e.args[0], 'user-defined function raised exception') + +    def CheckParamString(self): +        cur = self.con.cursor() +        cur.execute("select isstring(?)", ("foo",)) +        val = cur.fetchone()[0] +        self.assertEqual(val, 1) + +    def CheckParamInt(self): +        cur = self.con.cursor() +        cur.execute("select isint(?)", (42,)) +        val = cur.fetchone()[0] +        self.assertEqual(val, 1) + +    def CheckParamFloat(self): +        cur = self.con.cursor() +        cur.execute("select isfloat(?)", (3.14,)) +        val = cur.fetchone()[0] +        self.assertEqual(val, 1) + +    def CheckParamNone(self): +        cur = self.con.cursor() +        cur.execute("select isnone(?)", (None,)) +        val = cur.fetchone()[0] +        self.assertEqual(val, 1) + +    def CheckParamBlob(self): +        cur = self.con.cursor() +        cur.execute("select isblob(?)", (buffer("blob"),)) +        val = cur.fetchone()[0] +        self.assertEqual(val, 1) + +class AggregateTests(unittest.TestCase): +    def setUp(self): +        self.con = sqlite.connect(":memory:") +        cur = self.con.cursor() +        cur.execute(""" +            create table test( +                t text, +                i integer, +                f float, +                n, +                b blob +                ) +            """) +        cur.execute("insert into test(t, i, f, n, b) values (?, ?, ?, ?, ?)", +            ("foo", 5, 3.14, None, buffer("blob"),)) + +        self.con.create_aggregate("nostep", 1, AggrNoStep) +        self.con.create_aggregate("nofinalize", 1, AggrNoFinalize) +        self.con.create_aggregate("excInit", 1, AggrExceptionInInit) +        self.con.create_aggregate("excStep", 1, AggrExceptionInStep) +        self.con.create_aggregate("excFinalize", 1, AggrExceptionInFinalize) +        self.con.create_aggregate("checkType", 2, AggrCheckType) +        self.con.create_aggregate("mysum", 1, AggrSum) + +    def tearDown(self): +        #self.cur.close() +        #self.con.close() +        pass + +    def CheckAggrErrorOnCreate(self): +        try: +            self.con.create_function("bla", -100, AggrSum) +            self.fail("should have raised an OperationalError") +        except sqlite.OperationalError: +            pass + +    def CheckAggrNoStep(self): +        cur = self.con.cursor() +        try: +            cur.execute("select nostep(t) from test") +            self.fail("should have raised an AttributeError") +        except AttributeError, e: +            self.assertEqual(e.args[0], "AggrNoStep instance has no attribute 'step'") + +    def CheckAggrNoFinalize(self): +        cur = self.con.cursor() +        try: +            cur.execute("select nofinalize(t) from test") +            val = cur.fetchone()[0] +            self.fail("should have raised an OperationalError") +        except sqlite.OperationalError, e: +            self.assertEqual(e.args[0], "user-defined aggregate's 'finalize' method raised error") + +    def CheckAggrExceptionInInit(self): +        cur = self.con.cursor() +        try: +            cur.execute("select excInit(t) from test") +            val = cur.fetchone()[0] +            self.fail("should have raised an OperationalError") +        except sqlite.OperationalError, e: +            self.assertEqual(e.args[0], "user-defined aggregate's '__init__' method raised error") + +    def CheckAggrExceptionInStep(self): +        cur = self.con.cursor() +        try: +            cur.execute("select excStep(t) from test") +            val = cur.fetchone()[0] +            self.fail("should have raised an OperationalError") +        except sqlite.OperationalError, e: +            self.assertEqual(e.args[0], "user-defined aggregate's 'step' method raised error") + +    def CheckAggrExceptionInFinalize(self): +        cur = self.con.cursor() +        try: +            cur.execute("select excFinalize(t) from test") +            val = cur.fetchone()[0] +            self.fail("should have raised an OperationalError") +        except sqlite.OperationalError, e: +            self.assertEqual(e.args[0], "user-defined aggregate's 'finalize' method raised error") + +    def CheckAggrCheckParamStr(self): +        cur = self.con.cursor() +        cur.execute("select checkType('str', ?)", ("foo",)) +        val = cur.fetchone()[0] +        self.assertEqual(val, 1) + +    def CheckAggrCheckParamInt(self): +        cur = self.con.cursor() +        cur.execute("select checkType('int', ?)", (42,)) +        val = cur.fetchone()[0] +        self.assertEqual(val, 1) + +    def CheckAggrCheckParamFloat(self): +        cur = self.con.cursor() +        cur.execute("select checkType('float', ?)", (3.14,)) +        val = cur.fetchone()[0] +        self.assertEqual(val, 1) + +    def CheckAggrCheckParamNone(self): +        cur = self.con.cursor() +        cur.execute("select checkType('None', ?)", (None,)) +        val = cur.fetchone()[0] +        self.assertEqual(val, 1) + +    def CheckAggrCheckParamBlob(self): +        cur = self.con.cursor() +        cur.execute("select checkType('blob', ?)", (buffer("blob"),)) +        val = cur.fetchone()[0] +        self.assertEqual(val, 1) + +    def CheckAggrCheckAggrSum(self): +        cur = self.con.cursor() +        cur.execute("delete from test") +        cur.executemany("insert into test(i) values (?)", [(10,), (20,), (30,)]) +        cur.execute("select mysum(i) from test") +        val = cur.fetchone()[0] +        self.assertEqual(val, 60) + +def authorizer_cb(action, arg1, arg2, dbname, source): +    if action != sqlite.SQLITE_SELECT: +        return sqlite.SQLITE_DENY +    if arg2 == 'c2' or arg1 == 't2': +        return sqlite.SQLITE_DENY +    return sqlite.SQLITE_OK + +class AuthorizerTests(unittest.TestCase): +    def setUp(self): +        self.con = sqlite.connect(":memory:") +        self.con.executescript(""" +            create table t1 (c1, c2); +            create table t2 (c1, c2); +            insert into t1 (c1, c2) values (1, 2); +            insert into t2 (c1, c2) values (4, 5); +            """) + +        # For our security test: +        self.con.execute("select c2 from t2") + +        self.con.set_authorizer(authorizer_cb) + +    def tearDown(self): +        pass + +    def CheckTableAccess(self): +        try: +            self.con.execute("select * from t2") +        except sqlite.DatabaseError, e: +            if not e.args[0].endswith("prohibited"): +                self.fail("wrong exception text: %s" % e.args[0]) +            return +        self.fail("should have raised an exception due to missing privileges") + +    def CheckColumnAccess(self): +        try: +            self.con.execute("select c2 from t1") +        except sqlite.DatabaseError, e: +            if not e.args[0].endswith("prohibited"): +                self.fail("wrong exception text: %s" % e.args[0]) +            return +        self.fail("should have raised an exception due to missing privileges") + +def suite(): +    function_suite = unittest.makeSuite(FunctionTests, "Check") +    aggregate_suite = unittest.makeSuite(AggregateTests, "Check") +    authorizer_suite = unittest.makeSuite(AuthorizerTests, "Check") +    return unittest.TestSuite((function_suite, aggregate_suite, authorizer_suite)) + +def test(): +    runner = unittest.TextTestRunner() +    runner.run(suite()) + +if __name__ == "__main__": +    test() | 
