1#-*- coding: iso-8859-1 -*-
2# pysqlite2/test/regression.py: pysqlite regression tests
3#
4# Copyright (C) 2006-2010 Gerhard H�ring <gh@ghaering.de>
5#
6# This file is part of pysqlite.
7#
8# This software is provided 'as-is', without any express or implied
9# warranty.  In no event will the authors be held liable for any damages
10# arising from the use of this software.
11#
12# Permission is granted to anyone to use this software for any purpose,
13# including commercial applications, and to alter it and redistribute it
14# freely, subject to the following restrictions:
15#
16# 1. The origin of this software must not be misrepresented; you must not
17#    claim that you wrote the original software. If you use this software
18#    in a product, an acknowledgment in the product documentation would be
19#    appreciated but is not required.
20# 2. Altered source versions must be plainly marked as such, and must not be
21#    misrepresented as being the original software.
22# 3. This notice may not be removed or altered from any source distribution.
23
24import datetime
25import unittest
26import sqlite3 as sqlite
27import weakref
28from test import support
29
30class RegressionTests(unittest.TestCase):
31    def setUp(self):
32        self.con = sqlite.connect(":memory:")
33
34    def tearDown(self):
35        self.con.close()
36
37    def CheckPragmaUserVersion(self):
38        # This used to crash pysqlite because this pragma command returns NULL for the column name
39        cur = self.con.cursor()
40        cur.execute("pragma user_version")
41
42    def CheckPragmaSchemaVersion(self):
43        # This still crashed pysqlite <= 2.2.1
44        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
45        try:
46            cur = self.con.cursor()
47            cur.execute("pragma schema_version")
48        finally:
49            cur.close()
50            con.close()
51
52    def CheckStatementReset(self):
53        # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are
54        # reset before a rollback, but only those that are still in the
55        # statement cache. The others are not accessible from the connection object.
56        con = sqlite.connect(":memory:", cached_statements=5)
57        cursors = [con.cursor() for x in range(5)]
58        cursors[0].execute("create table test(x)")
59        for i in range(10):
60            cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in range(10)])
61
62        for i in range(5):
63            cursors[i].execute(" " * i + "select x from test")
64
65        con.rollback()
66
67    def CheckColumnNameWithSpaces(self):
68        cur = self.con.cursor()
69        cur.execute('select 1 as "foo bar [datetime]"')
70        self.assertEqual(cur.description[0][0], "foo bar")
71
72        cur.execute('select 1 as "foo baz"')
73        self.assertEqual(cur.description[0][0], "foo baz")
74
75    def CheckStatementFinalizationOnCloseDb(self):
76        # pysqlite versions <= 2.3.3 only finalized statements in the statement
77        # cache when closing the database. statements that were still
78        # referenced in cursors weren't closed and could provoke "
79        # "OperationalError: Unable to close due to unfinalised statements".
80        con = sqlite.connect(":memory:")
81        cursors = []
82        # default statement cache size is 100
83        for i in range(105):
84            cur = con.cursor()
85            cursors.append(cur)
86            cur.execute("select 1 x union select " + str(i))
87        con.close()
88
89    @unittest.skipIf(sqlite.sqlite_version_info < (3, 2, 2), 'needs sqlite 3.2.2 or newer')
90    def CheckOnConflictRollback(self):
91        con = sqlite.connect(":memory:")
92        con.execute("create table foo(x, unique(x) on conflict rollback)")
93        con.execute("insert into foo(x) values (1)")
94        try:
95            con.execute("insert into foo(x) values (1)")
96        except sqlite.DatabaseError:
97            pass
98        con.execute("insert into foo(x) values (2)")
99        try:
100            con.commit()
101        except sqlite.OperationalError:
102            self.fail("pysqlite knew nothing about the implicit ROLLBACK")
103
104    def CheckWorkaroundForBuggySqliteTransferBindings(self):
105        """
106        pysqlite would crash with older SQLite versions unless
107        a workaround is implemented.
108        """
109        self.con.execute("create table foo(bar)")
110        self.con.execute("drop table foo")
111        self.con.execute("create table foo(bar)")
112
113    def CheckEmptyStatement(self):
114        """
115        pysqlite used to segfault with SQLite versions 3.5.x. These return NULL
116        for "no-operation" statements
117        """
118        self.con.execute("")
119
120    def CheckTypeMapUsage(self):
121        """
122        pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling
123        a statement. This test exhibits the problem.
124        """
125        SELECT = "select * from foo"
126        con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES)
127        con.execute("create table foo(bar timestamp)")
128        con.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),))
129        con.execute(SELECT)
130        con.execute("drop table foo")
131        con.execute("create table foo(bar integer)")
132        con.execute("insert into foo(bar) values (5)")
133        con.execute(SELECT)
134
135    def CheckErrorMsgDecodeError(self):
136        # When porting the module to Python 3.0, the error message about
137        # decoding errors disappeared. This verifies they're back again.
138        with self.assertRaises(sqlite.OperationalError) as cm:
139            self.con.execute("select 'xxx' || ? || 'yyy' colname",
140                             (bytes(bytearray([250])),)).fetchone()
141        msg = "Could not decode to UTF-8 column 'colname' with text 'xxx"
142        self.assertIn(msg, str(cm.exception))
143
144    def CheckRegisterAdapter(self):
145        """
146        See issue 3312.
147        """
148        self.assertRaises(TypeError, sqlite.register_adapter, {}, None)
149
150    def CheckSetIsolationLevel(self):
151        # See issue 27881.
152        class CustomStr(str):
153            def upper(self):
154                return None
155            def __del__(self):
156                con.isolation_level = ""
157
158        con = sqlite.connect(":memory:")
159        con.isolation_level = None
160        for level in "", "DEFERRED", "IMMEDIATE", "EXCLUSIVE":
161            with self.subTest(level=level):
162                con.isolation_level = level
163                con.isolation_level = level.lower()
164                con.isolation_level = level.capitalize()
165                con.isolation_level = CustomStr(level)
166
167        # setting isolation_level failure should not alter previous state
168        con.isolation_level = None
169        con.isolation_level = "DEFERRED"
170        pairs = [
171            (1, TypeError), (b'', TypeError), ("abc", ValueError),
172            ("IMMEDIATE\0EXCLUSIVE", ValueError), ("\xe9", ValueError),
173        ]
174        for value, exc in pairs:
175            with self.subTest(level=value):
176                with self.assertRaises(exc):
177                    con.isolation_level = value
178                self.assertEqual(con.isolation_level, "DEFERRED")
179
180    def CheckCursorConstructorCallCheck(self):
181        """
182        Verifies that cursor methods check whether base class __init__ was
183        called.
184        """
185        class Cursor(sqlite.Cursor):
186            def __init__(self, con):
187                pass
188
189        con = sqlite.connect(":memory:")
190        cur = Cursor(con)
191        with self.assertRaises(sqlite.ProgrammingError):
192            cur.execute("select 4+5").fetchall()
193        with self.assertRaisesRegex(sqlite.ProgrammingError,
194                                    r'^Base Cursor\.__init__ not called\.$'):
195            cur.close()
196
197    def CheckStrSubclass(self):
198        """
199        The Python 3.0 port of the module didn't cope with values of subclasses of str.
200        """
201        class MyStr(str): pass
202        self.con.execute("select ?", (MyStr("abc"),))
203
204    def CheckConnectionConstructorCallCheck(self):
205        """
206        Verifies that connection methods check whether base class __init__ was
207        called.
208        """
209        class Connection(sqlite.Connection):
210            def __init__(self, name):
211                pass
212
213        con = Connection(":memory:")
214        with self.assertRaises(sqlite.ProgrammingError):
215            cur = con.cursor()
216
217    def CheckCursorRegistration(self):
218        """
219        Verifies that subclassed cursor classes are correctly registered with
220        the connection object, too.  (fetch-across-rollback problem)
221        """
222        class Connection(sqlite.Connection):
223            def cursor(self):
224                return Cursor(self)
225
226        class Cursor(sqlite.Cursor):
227            def __init__(self, con):
228                sqlite.Cursor.__init__(self, con)
229
230        con = Connection(":memory:")
231        cur = con.cursor()
232        cur.execute("create table foo(x)")
233        cur.executemany("insert into foo(x) values (?)", [(3,), (4,), (5,)])
234        cur.execute("select x from foo")
235        con.rollback()
236        with self.assertRaises(sqlite.InterfaceError):
237            cur.fetchall()
238
239    def CheckAutoCommit(self):
240        """
241        Verifies that creating a connection in autocommit mode works.
242        2.5.3 introduced a regression so that these could no longer
243        be created.
244        """
245        con = sqlite.connect(":memory:", isolation_level=None)
246
247    def CheckPragmaAutocommit(self):
248        """
249        Verifies that running a PRAGMA statement that does an autocommit does
250        work. This did not work in 2.5.3/2.5.4.
251        """
252        cur = self.con.cursor()
253        cur.execute("create table foo(bar)")
254        cur.execute("insert into foo(bar) values (5)")
255
256        cur.execute("pragma page_size")
257        row = cur.fetchone()
258
259    def CheckConnectionCall(self):
260        """
261        Call a connection with a non-string SQL request: check error handling
262        of the statement constructor.
263        """
264        self.assertRaises(sqlite.Warning, self.con, 1)
265
266    def CheckCollation(self):
267        def collation_cb(a, b):
268            return 1
269        self.assertRaises(sqlite.ProgrammingError, self.con.create_collation,
270            # Lone surrogate cannot be encoded to the default encoding (utf8)
271            "\uDC80", collation_cb)
272
273    def CheckRecursiveCursorUse(self):
274        """
275        http://bugs.python.org/issue10811
276
277        Recursively using a cursor, such as when reusing it from a generator led to segfaults.
278        Now we catch recursive cursor usage and raise a ProgrammingError.
279        """
280        con = sqlite.connect(":memory:")
281
282        cur = con.cursor()
283        cur.execute("create table a (bar)")
284        cur.execute("create table b (baz)")
285
286        def foo():
287            cur.execute("insert into a (bar) values (?)", (1,))
288            yield 1
289
290        with self.assertRaises(sqlite.ProgrammingError):
291            cur.executemany("insert into b (baz) values (?)",
292                            ((i,) for i in foo()))
293
294    def CheckConvertTimestampMicrosecondPadding(self):
295        """
296        http://bugs.python.org/issue14720
297
298        The microsecond parsing of convert_timestamp() should pad with zeros,
299        since the microsecond string "456" actually represents "456000".
300        """
301
302        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
303        cur = con.cursor()
304        cur.execute("CREATE TABLE t (x TIMESTAMP)")
305
306        # Microseconds should be 456000
307        cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')")
308
309        # Microseconds should be truncated to 123456
310        cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')")
311
312        cur.execute("SELECT * FROM t")
313        values = [x[0] for x in cur.fetchall()]
314
315        self.assertEqual(values, [
316            datetime.datetime(2012, 4, 4, 15, 6, 0, 456000),
317            datetime.datetime(2012, 4, 4, 15, 6, 0, 123456),
318        ])
319
320    def CheckInvalidIsolationLevelType(self):
321        # isolation level is a string, not an integer
322        self.assertRaises(TypeError,
323                          sqlite.connect, ":memory:", isolation_level=123)
324
325
326    def CheckNullCharacter(self):
327        # Issue #21147
328        con = sqlite.connect(":memory:")
329        self.assertRaises(ValueError, con, "\0select 1")
330        self.assertRaises(ValueError, con, "select 1\0")
331        cur = con.cursor()
332        self.assertRaises(ValueError, cur.execute, " \0select 2")
333        self.assertRaises(ValueError, cur.execute, "select 2\0")
334
335    def CheckCommitCursorReset(self):
336        """
337        Connection.commit() did reset cursors, which made sqlite3
338        to return rows multiple times when fetched from cursors
339        after commit. See issues 10513 and 23129 for details.
340        """
341        con = sqlite.connect(":memory:")
342        con.executescript("""
343        create table t(c);
344        create table t2(c);
345        insert into t values(0);
346        insert into t values(1);
347        insert into t values(2);
348        """)
349
350        self.assertEqual(con.isolation_level, "")
351
352        counter = 0
353        for i, row in enumerate(con.execute("select c from t")):
354            with self.subTest(i=i, row=row):
355                con.execute("insert into t2(c) values (?)", (i,))
356                con.commit()
357                if counter == 0:
358                    self.assertEqual(row[0], 0)
359                elif counter == 1:
360                    self.assertEqual(row[0], 1)
361                elif counter == 2:
362                    self.assertEqual(row[0], 2)
363                counter += 1
364        self.assertEqual(counter, 3, "should have returned exactly three rows")
365
366    def CheckBpo31770(self):
367        """
368        The interpreter shouldn't crash in case Cursor.__init__() is called
369        more than once.
370        """
371        def callback(*args):
372            pass
373        con = sqlite.connect(":memory:")
374        cur = sqlite.Cursor(con)
375        ref = weakref.ref(cur, callback)
376        cur.__init__(con)
377        del cur
378        # The interpreter shouldn't crash when ref is collected.
379        del ref
380        support.gc_collect()
381
382    def CheckDelIsolation_levelSegfault(self):
383        with self.assertRaises(AttributeError):
384            del self.con.isolation_level
385
386
387class UnhashableFunc:
388    __hash__ = None
389
390    def __init__(self, return_value=None):
391        self.calls = 0
392        self.return_value = return_value
393
394    def __call__(self, *args, **kwargs):
395        self.calls += 1
396        return self.return_value
397
398
399class UnhashableCallbacksTestCase(unittest.TestCase):
400    """
401    https://bugs.python.org/issue34052
402
403    Registering unhashable callbacks raises TypeError, callbacks are not
404    registered in SQLite after such registration attempt.
405    """
406    def setUp(self):
407        self.con = sqlite.connect(':memory:')
408
409    def tearDown(self):
410        self.con.close()
411
412    def test_progress_handler(self):
413        f = UnhashableFunc(return_value=0)
414        with self.assertRaisesRegex(TypeError, 'unhashable type'):
415            self.con.set_progress_handler(f, 1)
416        self.con.execute('SELECT 1')
417        self.assertFalse(f.calls)
418
419    def test_func(self):
420        func_name = 'func_name'
421        f = UnhashableFunc()
422        with self.assertRaisesRegex(TypeError, 'unhashable type'):
423            self.con.create_function(func_name, 0, f)
424        msg = 'no such function: %s' % func_name
425        with self.assertRaisesRegex(sqlite.OperationalError, msg):
426            self.con.execute('SELECT %s()' % func_name)
427        self.assertFalse(f.calls)
428
429    def test_authorizer(self):
430        f = UnhashableFunc(return_value=sqlite.SQLITE_DENY)
431        with self.assertRaisesRegex(TypeError, 'unhashable type'):
432            self.con.set_authorizer(f)
433        self.con.execute('SELECT 1')
434        self.assertFalse(f.calls)
435
436    def test_aggr(self):
437        class UnhashableType(type):
438            __hash__ = None
439        aggr_name = 'aggr_name'
440        with self.assertRaisesRegex(TypeError, 'unhashable type'):
441            self.con.create_aggregate(aggr_name, 0, UnhashableType('Aggr', (), {}))
442        msg = 'no such function: %s' % aggr_name
443        with self.assertRaisesRegex(sqlite.OperationalError, msg):
444            self.con.execute('SELECT %s()' % aggr_name)
445
446
447def suite():
448    regression_suite = unittest.makeSuite(RegressionTests, "Check")
449    return unittest.TestSuite((
450        regression_suite,
451        unittest.makeSuite(UnhashableCallbacksTestCase),
452    ))
453
454def test():
455    runner = unittest.TextTestRunner()
456    runner.run(suite())
457
458if __name__ == "__main__":
459    test()
460