1#-*- coding: iso-8859-1 -*-
2# pysqlite2/test/regression.py: pysqlite regression tests
3#
4# Copyright (C) 2006-2007 Gerhard H�ring <gh@ghaering.de>
5#
6# This file is part of pysqlite.
7#
8# This software is provided 'as-is', without any express or implied
9# warranty.  In no event will the authors be held liable for any damages
10# arising from the use of this software.
11#
12# Permission is granted to anyone to use this software for any purpose,
13# including commercial applications, and to alter it and redistribute it
14# freely, subject to the following restrictions:
15#
16# 1. The origin of this software must not be misrepresented; you must not
17#    claim that you wrote the original software. If you use this software
18#    in a product, an acknowledgment in the product documentation would be
19#    appreciated but is not required.
20# 2. Altered source versions must be plainly marked as such, and must not be
21#    misrepresented as being the original software.
22# 3. This notice may not be removed or altered from any source distribution.
23
24import datetime
25import unittest
26import sqlite3 as sqlite
27import weakref
28from test import support
29
30class RegressionTests(unittest.TestCase):
31    def setUp(self):
32        self.con = sqlite.connect(":memory:")
33
34    def tearDown(self):
35        self.con.close()
36
37    def CheckPragmaUserVersion(self):
38        # This used to crash pysqlite because this pragma command returns NULL for the column name
39        cur = self.con.cursor()
40        cur.execute("pragma user_version")
41
42    def CheckPragmaSchemaVersion(self):
43        # This still crashed pysqlite <= 2.2.1
44        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
45        try:
46            cur = self.con.cursor()
47            cur.execute("pragma schema_version")
48        finally:
49            cur.close()
50            con.close()
51
52    def CheckStatementReset(self):
53        # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are
54        # reset before a rollback, but only those that are still in the
55        # statement cache. The others are not accessible from the connection object.
56        con = sqlite.connect(":memory:", cached_statements=5)
57        cursors = [con.cursor() for x in xrange(5)]
58        cursors[0].execute("create table test(x)")
59        for i in range(10):
60            cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in xrange(10)])
61
62        for i in range(5):
63            cursors[i].execute(" " * i + "select x from test")
64
65        con.rollback()
66
67    def CheckColumnNameWithSpaces(self):
68        cur = self.con.cursor()
69        cur.execute('select 1 as "foo bar [datetime]"')
70        self.assertEqual(cur.description[0][0], "foo bar")
71
72        cur.execute('select 1 as "foo baz"')
73        self.assertEqual(cur.description[0][0], "foo baz")
74
75    def CheckStatementFinalizationOnCloseDb(self):
76        # pysqlite versions <= 2.3.3 only finalized statements in the statement
77        # cache when closing the database. statements that were still
78        # referenced in cursors weren't closed and could provoke "
79        # "OperationalError: Unable to close due to unfinalised statements".
80        con = sqlite.connect(":memory:")
81        cursors = []
82        # default statement cache size is 100
83        for i in range(105):
84            cur = con.cursor()
85            cursors.append(cur)
86            cur.execute("select 1 x union select " + str(i))
87        con.close()
88
89    def CheckOnConflictRollback(self):
90        if sqlite.sqlite_version_info < (3, 2, 2):
91            return
92        con = sqlite.connect(":memory:")
93        con.execute("create table foo(x, unique(x) on conflict rollback)")
94        con.execute("insert into foo(x) values (1)")
95        try:
96            con.execute("insert into foo(x) values (1)")
97        except sqlite.DatabaseError:
98            pass
99        con.execute("insert into foo(x) values (2)")
100        try:
101            con.commit()
102        except sqlite.OperationalError:
103            self.fail("pysqlite knew nothing about the implicit ROLLBACK")
104
105    def CheckWorkaroundForBuggySqliteTransferBindings(self):
106        """
107        pysqlite would crash with older SQLite versions unless
108        a workaround is implemented.
109        """
110        self.con.execute("create table foo(bar)")
111        self.con.execute("drop table foo")
112        self.con.execute("create table foo(bar)")
113
114    def CheckEmptyStatement(self):
115        """
116        pysqlite used to segfault with SQLite versions 3.5.x. These return NULL
117        for "no-operation" statements
118        """
119        self.con.execute("")
120
121    def CheckUnicodeConnect(self):
122        """
123        With pysqlite 2.4.0 you needed to use a string or an APSW connection
124        object for opening database connections.
125
126        Formerly, both bytestrings and unicode strings used to work.
127
128        Let's make sure unicode strings work in the future.
129        """
130        con = sqlite.connect(u":memory:")
131        con.close()
132
133    def CheckTypeMapUsage(self):
134        """
135        pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling
136        a statement. This test exhibits the problem.
137        """
138        SELECT = "select * from foo"
139        con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES)
140        con.execute("create table foo(bar timestamp)")
141        con.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),))
142        con.execute(SELECT)
143        con.execute("drop table foo")
144        con.execute("create table foo(bar integer)")
145        con.execute("insert into foo(bar) values (5)")
146        con.execute(SELECT)
147
148    def CheckRegisterAdapter(self):
149        """
150        See issue 3312.
151        """
152        self.assertRaises(TypeError, sqlite.register_adapter, {}, None)
153
154    def CheckSetIsolationLevel(self):
155        """
156        See issue 3312.
157        """
158        con = sqlite.connect(":memory:")
159        self.assertRaises(UnicodeEncodeError, setattr, con,
160                          "isolation_level", u"\xe9")
161
162    def CheckCursorConstructorCallCheck(self):
163        """
164        Verifies that cursor methods check whether base class __init__ was
165        called.
166        """
167        class Cursor(sqlite.Cursor):
168            def __init__(self, con):
169                pass
170
171        con = sqlite.connect(":memory:")
172        cur = Cursor(con)
173        try:
174            cur.execute("select 4+5").fetchall()
175            self.fail("should have raised ProgrammingError")
176        except sqlite.ProgrammingError:
177            pass
178        except:
179            self.fail("should have raised ProgrammingError")
180        with self.assertRaisesRegexp(sqlite.ProgrammingError,
181                                     r'^Base Cursor\.__init__ not called\.$'):
182            cur.close()
183
184    def CheckConnectionConstructorCallCheck(self):
185        """
186        Verifies that connection methods check whether base class __init__ was
187        called.
188        """
189        class Connection(sqlite.Connection):
190            def __init__(self, name):
191                pass
192
193        con = Connection(":memory:")
194        try:
195            cur = con.cursor()
196            self.fail("should have raised ProgrammingError")
197        except sqlite.ProgrammingError:
198            pass
199        except:
200            self.fail("should have raised ProgrammingError")
201
202    def CheckCursorRegistration(self):
203        """
204        Verifies that subclassed cursor classes are correctly registered with
205        the connection object, too.  (fetch-across-rollback problem)
206        """
207        class Connection(sqlite.Connection):
208            def cursor(self):
209                return Cursor(self)
210
211        class Cursor(sqlite.Cursor):
212            def __init__(self, con):
213                sqlite.Cursor.__init__(self, con)
214
215        con = Connection(":memory:")
216        cur = con.cursor()
217        cur.execute("create table foo(x)")
218        cur.executemany("insert into foo(x) values (?)", [(3,), (4,), (5,)])
219        cur.execute("select x from foo")
220        con.rollback()
221        try:
222            cur.fetchall()
223            self.fail("should have raised InterfaceError")
224        except sqlite.InterfaceError:
225            pass
226        except:
227            self.fail("should have raised InterfaceError")
228
229    def CheckAutoCommit(self):
230        """
231        Verifies that creating a connection in autocommit mode works.
232        2.5.3 introduced a regression so that these could no longer
233        be created.
234        """
235        con = sqlite.connect(":memory:", isolation_level=None)
236
237    def CheckPragmaAutocommit(self):
238        """
239        Verifies that running a PRAGMA statement that does an autocommit does
240        work. This did not work in 2.5.3/2.5.4.
241        """
242        cur = self.con.cursor()
243        cur.execute("create table foo(bar)")
244        cur.execute("insert into foo(bar) values (5)")
245
246        cur.execute("pragma page_size")
247        row = cur.fetchone()
248
249    def CheckSetDict(self):
250        """
251        See http://bugs.python.org/issue7478
252
253        It was possible to successfully register callbacks that could not be
254        hashed. Return codes of PyDict_SetItem were not checked properly.
255        """
256        class NotHashable:
257            def __call__(self, *args, **kw):
258                pass
259            def __hash__(self):
260                raise TypeError()
261        var = NotHashable()
262        self.assertRaises(TypeError, self.con.create_function, var)
263        self.assertRaises(TypeError, self.con.create_aggregate, var)
264        self.assertRaises(TypeError, self.con.set_authorizer, var)
265        self.assertRaises(TypeError, self.con.set_progress_handler, var)
266
267    def CheckConnectionCall(self):
268        """
269        Call a connection with a non-string SQL request: check error handling
270        of the statement constructor.
271        """
272        self.assertRaises(sqlite.Warning, self.con, 1)
273
274    def CheckRecursiveCursorUse(self):
275        """
276        http://bugs.python.org/issue10811
277
278        Recursively using a cursor, such as when reusing it from a generator led to segfaults.
279        Now we catch recursive cursor usage and raise a ProgrammingError.
280        """
281        con = sqlite.connect(":memory:")
282
283        cur = con.cursor()
284        cur.execute("create table a (bar)")
285        cur.execute("create table b (baz)")
286
287        def foo():
288            cur.execute("insert into a (bar) values (?)", (1,))
289            yield 1
290
291        with self.assertRaises(sqlite.ProgrammingError):
292            cur.executemany("insert into b (baz) values (?)",
293                            ((i,) for i in foo()))
294
295    def CheckConvertTimestampMicrosecondPadding(self):
296        """
297        http://bugs.python.org/issue14720
298
299        The microsecond parsing of convert_timestamp() should pad with zeros,
300        since the microsecond string "456" actually represents "456000".
301        """
302
303        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
304        cur = con.cursor()
305        cur.execute("CREATE TABLE t (x TIMESTAMP)")
306
307        # Microseconds should be 456000
308        cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')")
309
310        # Microseconds should be truncated to 123456
311        cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')")
312
313        cur.execute("SELECT * FROM t")
314        values = [x[0] for x in cur.fetchall()]
315
316        self.assertEqual(values, [
317            datetime.datetime(2012, 4, 4, 15, 6, 0, 456000),
318            datetime.datetime(2012, 4, 4, 15, 6, 0, 123456),
319        ])
320
321    def CheckInvalidIsolationLevelType(self):
322        # isolation level is a string, not an integer
323        self.assertRaises(TypeError,
324                          sqlite.connect, ":memory:", isolation_level=123)
325
326
327    def CheckNullCharacter(self):
328        # Issue #21147
329        con = sqlite.connect(":memory:")
330        self.assertRaises(ValueError, con, "\0select 1")
331        self.assertRaises(ValueError, con, "select 1\0")
332        cur = con.cursor()
333        self.assertRaises(ValueError, cur.execute, " \0select 2")
334        self.assertRaises(ValueError, cur.execute, "select 2\0")
335
336    def CheckCommitCursorReset(self):
337        """
338        Connection.commit() did reset cursors, which made sqlite3
339        to return rows multiple times when fetched from cursors
340        after commit. See issues 10513 and 23129 for details.
341        """
342        con = sqlite.connect(":memory:")
343        con.executescript("""
344        create table t(c);
345        create table t2(c);
346        insert into t values(0);
347        insert into t values(1);
348        insert into t values(2);
349        """)
350
351        self.assertEqual(con.isolation_level, "")
352
353        counter = 0
354        for i, row in enumerate(con.execute("select c from t")):
355            con.execute("insert into t2(c) values (?)", (i,))
356            con.commit()
357            if counter == 0:
358                self.assertEqual(row[0], 0)
359            elif counter == 1:
360                self.assertEqual(row[0], 1)
361            elif counter == 2:
362                self.assertEqual(row[0], 2)
363            counter += 1
364        self.assertEqual(counter, 3, "should have returned exactly three rows")
365
366    def CheckBpo31770(self):
367        """
368        The interpreter shouldn't crash in case Cursor.__init__() is called
369        more than once.
370        """
371        def callback(*args):
372            pass
373        con = sqlite.connect(":memory:")
374        cur = sqlite.Cursor(con)
375        ref = weakref.ref(cur, callback)
376        cur.__init__(con)
377        del cur
378        # The interpreter shouldn't crash when ref is collected.
379        del ref
380        support.gc_collect()
381
382
383def suite():
384    regression_suite = unittest.makeSuite(RegressionTests, "Check")
385    return unittest.TestSuite((regression_suite,))
386
387def test():
388    runner = unittest.TextTestRunner()
389    runner.run(suite())
390
391if __name__ == "__main__":
392    test()
393