1#-*- coding: iso-8859-1 -*- 2# pysqlite2/test/regression.py: pysqlite regression tests 3# 4# Copyright (C) 2006-2010 Gerhard H�ring <gh@ghaering.de> 5# 6# This file is part of pysqlite. 7# 8# This software is provided 'as-is', without any express or implied 9# warranty. In no event will the authors be held liable for any damages 10# arising from the use of this software. 11# 12# Permission is granted to anyone to use this software for any purpose, 13# including commercial applications, and to alter it and redistribute it 14# freely, subject to the following restrictions: 15# 16# 1. The origin of this software must not be misrepresented; you must not 17# claim that you wrote the original software. If you use this software 18# in a product, an acknowledgment in the product documentation would be 19# appreciated but is not required. 20# 2. Altered source versions must be plainly marked as such, and must not be 21# misrepresented as being the original software. 22# 3. This notice may not be removed or altered from any source distribution. 23 24import datetime 25import unittest 26import sqlite3 as sqlite 27import weakref 28from test import support 29 30class RegressionTests(unittest.TestCase): 31 def setUp(self): 32 self.con = sqlite.connect(":memory:") 33 34 def tearDown(self): 35 self.con.close() 36 37 def CheckPragmaUserVersion(self): 38 # This used to crash pysqlite because this pragma command returns NULL for the column name 39 cur = self.con.cursor() 40 cur.execute("pragma user_version") 41 42 def CheckPragmaSchemaVersion(self): 43 # This still crashed pysqlite <= 2.2.1 44 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES) 45 try: 46 cur = self.con.cursor() 47 cur.execute("pragma schema_version") 48 finally: 49 cur.close() 50 con.close() 51 52 def CheckStatementReset(self): 53 # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are 54 # reset before a rollback, but only those that are still in the 55 # statement cache. The others are not accessible from the connection object. 56 con = sqlite.connect(":memory:", cached_statements=5) 57 cursors = [con.cursor() for x in range(5)] 58 cursors[0].execute("create table test(x)") 59 for i in range(10): 60 cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in range(10)]) 61 62 for i in range(5): 63 cursors[i].execute(" " * i + "select x from test") 64 65 con.rollback() 66 67 def CheckColumnNameWithSpaces(self): 68 cur = self.con.cursor() 69 cur.execute('select 1 as "foo bar [datetime]"') 70 self.assertEqual(cur.description[0][0], "foo bar") 71 72 cur.execute('select 1 as "foo baz"') 73 self.assertEqual(cur.description[0][0], "foo baz") 74 75 def CheckStatementFinalizationOnCloseDb(self): 76 # pysqlite versions <= 2.3.3 only finalized statements in the statement 77 # cache when closing the database. statements that were still 78 # referenced in cursors weren't closed and could provoke " 79 # "OperationalError: Unable to close due to unfinalised statements". 80 con = sqlite.connect(":memory:") 81 cursors = [] 82 # default statement cache size is 100 83 for i in range(105): 84 cur = con.cursor() 85 cursors.append(cur) 86 cur.execute("select 1 x union select " + str(i)) 87 con.close() 88 89 @unittest.skipIf(sqlite.sqlite_version_info < (3, 2, 2), 'needs sqlite 3.2.2 or newer') 90 def CheckOnConflictRollback(self): 91 con = sqlite.connect(":memory:") 92 con.execute("create table foo(x, unique(x) on conflict rollback)") 93 con.execute("insert into foo(x) values (1)") 94 try: 95 con.execute("insert into foo(x) values (1)") 96 except sqlite.DatabaseError: 97 pass 98 con.execute("insert into foo(x) values (2)") 99 try: 100 con.commit() 101 except sqlite.OperationalError: 102 self.fail("pysqlite knew nothing about the implicit ROLLBACK") 103 104 def CheckWorkaroundForBuggySqliteTransferBindings(self): 105 """ 106 pysqlite would crash with older SQLite versions unless 107 a workaround is implemented. 108 """ 109 self.con.execute("create table foo(bar)") 110 self.con.execute("drop table foo") 111 self.con.execute("create table foo(bar)") 112 113 def CheckEmptyStatement(self): 114 """ 115 pysqlite used to segfault with SQLite versions 3.5.x. These return NULL 116 for "no-operation" statements 117 """ 118 self.con.execute("") 119 120 def CheckTypeMapUsage(self): 121 """ 122 pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling 123 a statement. This test exhibits the problem. 124 """ 125 SELECT = "select * from foo" 126 con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES) 127 con.execute("create table foo(bar timestamp)") 128 con.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),)) 129 con.execute(SELECT) 130 con.execute("drop table foo") 131 con.execute("create table foo(bar integer)") 132 con.execute("insert into foo(bar) values (5)") 133 con.execute(SELECT) 134 135 def CheckErrorMsgDecodeError(self): 136 # When porting the module to Python 3.0, the error message about 137 # decoding errors disappeared. This verifies they're back again. 138 with self.assertRaises(sqlite.OperationalError) as cm: 139 self.con.execute("select 'xxx' || ? || 'yyy' colname", 140 (bytes(bytearray([250])),)).fetchone() 141 msg = "Could not decode to UTF-8 column 'colname' with text 'xxx" 142 self.assertIn(msg, str(cm.exception)) 143 144 def CheckRegisterAdapter(self): 145 """ 146 See issue 3312. 147 """ 148 self.assertRaises(TypeError, sqlite.register_adapter, {}, None) 149 150 def CheckSetIsolationLevel(self): 151 # See issue 27881. 152 class CustomStr(str): 153 def upper(self): 154 return None 155 def __del__(self): 156 con.isolation_level = "" 157 158 con = sqlite.connect(":memory:") 159 con.isolation_level = None 160 for level in "", "DEFERRED", "IMMEDIATE", "EXCLUSIVE": 161 with self.subTest(level=level): 162 con.isolation_level = level 163 con.isolation_level = level.lower() 164 con.isolation_level = level.capitalize() 165 con.isolation_level = CustomStr(level) 166 167 # setting isolation_level failure should not alter previous state 168 con.isolation_level = None 169 con.isolation_level = "DEFERRED" 170 pairs = [ 171 (1, TypeError), (b'', TypeError), ("abc", ValueError), 172 ("IMMEDIATE\0EXCLUSIVE", ValueError), ("\xe9", ValueError), 173 ] 174 for value, exc in pairs: 175 with self.subTest(level=value): 176 with self.assertRaises(exc): 177 con.isolation_level = value 178 self.assertEqual(con.isolation_level, "DEFERRED") 179 180 def CheckCursorConstructorCallCheck(self): 181 """ 182 Verifies that cursor methods check whether base class __init__ was 183 called. 184 """ 185 class Cursor(sqlite.Cursor): 186 def __init__(self, con): 187 pass 188 189 con = sqlite.connect(":memory:") 190 cur = Cursor(con) 191 with self.assertRaises(sqlite.ProgrammingError): 192 cur.execute("select 4+5").fetchall() 193 with self.assertRaisesRegex(sqlite.ProgrammingError, 194 r'^Base Cursor\.__init__ not called\.$'): 195 cur.close() 196 197 def CheckStrSubclass(self): 198 """ 199 The Python 3.0 port of the module didn't cope with values of subclasses of str. 200 """ 201 class MyStr(str): pass 202 self.con.execute("select ?", (MyStr("abc"),)) 203 204 def CheckConnectionConstructorCallCheck(self): 205 """ 206 Verifies that connection methods check whether base class __init__ was 207 called. 208 """ 209 class Connection(sqlite.Connection): 210 def __init__(self, name): 211 pass 212 213 con = Connection(":memory:") 214 with self.assertRaises(sqlite.ProgrammingError): 215 cur = con.cursor() 216 217 def CheckCursorRegistration(self): 218 """ 219 Verifies that subclassed cursor classes are correctly registered with 220 the connection object, too. (fetch-across-rollback problem) 221 """ 222 class Connection(sqlite.Connection): 223 def cursor(self): 224 return Cursor(self) 225 226 class Cursor(sqlite.Cursor): 227 def __init__(self, con): 228 sqlite.Cursor.__init__(self, con) 229 230 con = Connection(":memory:") 231 cur = con.cursor() 232 cur.execute("create table foo(x)") 233 cur.executemany("insert into foo(x) values (?)", [(3,), (4,), (5,)]) 234 cur.execute("select x from foo") 235 con.rollback() 236 with self.assertRaises(sqlite.InterfaceError): 237 cur.fetchall() 238 239 def CheckAutoCommit(self): 240 """ 241 Verifies that creating a connection in autocommit mode works. 242 2.5.3 introduced a regression so that these could no longer 243 be created. 244 """ 245 con = sqlite.connect(":memory:", isolation_level=None) 246 247 def CheckPragmaAutocommit(self): 248 """ 249 Verifies that running a PRAGMA statement that does an autocommit does 250 work. This did not work in 2.5.3/2.5.4. 251 """ 252 cur = self.con.cursor() 253 cur.execute("create table foo(bar)") 254 cur.execute("insert into foo(bar) values (5)") 255 256 cur.execute("pragma page_size") 257 row = cur.fetchone() 258 259 def CheckConnectionCall(self): 260 """ 261 Call a connection with a non-string SQL request: check error handling 262 of the statement constructor. 263 """ 264 self.assertRaises(sqlite.Warning, self.con, 1) 265 266 def CheckCollation(self): 267 def collation_cb(a, b): 268 return 1 269 self.assertRaises(sqlite.ProgrammingError, self.con.create_collation, 270 # Lone surrogate cannot be encoded to the default encoding (utf8) 271 "\uDC80", collation_cb) 272 273 def CheckRecursiveCursorUse(self): 274 """ 275 http://bugs.python.org/issue10811 276 277 Recursively using a cursor, such as when reusing it from a generator led to segfaults. 278 Now we catch recursive cursor usage and raise a ProgrammingError. 279 """ 280 con = sqlite.connect(":memory:") 281 282 cur = con.cursor() 283 cur.execute("create table a (bar)") 284 cur.execute("create table b (baz)") 285 286 def foo(): 287 cur.execute("insert into a (bar) values (?)", (1,)) 288 yield 1 289 290 with self.assertRaises(sqlite.ProgrammingError): 291 cur.executemany("insert into b (baz) values (?)", 292 ((i,) for i in foo())) 293 294 def CheckConvertTimestampMicrosecondPadding(self): 295 """ 296 http://bugs.python.org/issue14720 297 298 The microsecond parsing of convert_timestamp() should pad with zeros, 299 since the microsecond string "456" actually represents "456000". 300 """ 301 302 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES) 303 cur = con.cursor() 304 cur.execute("CREATE TABLE t (x TIMESTAMP)") 305 306 # Microseconds should be 456000 307 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')") 308 309 # Microseconds should be truncated to 123456 310 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')") 311 312 cur.execute("SELECT * FROM t") 313 values = [x[0] for x in cur.fetchall()] 314 315 self.assertEqual(values, [ 316 datetime.datetime(2012, 4, 4, 15, 6, 0, 456000), 317 datetime.datetime(2012, 4, 4, 15, 6, 0, 123456), 318 ]) 319 320 def CheckInvalidIsolationLevelType(self): 321 # isolation level is a string, not an integer 322 self.assertRaises(TypeError, 323 sqlite.connect, ":memory:", isolation_level=123) 324 325 326 def CheckNullCharacter(self): 327 # Issue #21147 328 con = sqlite.connect(":memory:") 329 self.assertRaises(ValueError, con, "\0select 1") 330 self.assertRaises(ValueError, con, "select 1\0") 331 cur = con.cursor() 332 self.assertRaises(ValueError, cur.execute, " \0select 2") 333 self.assertRaises(ValueError, cur.execute, "select 2\0") 334 335 def CheckCommitCursorReset(self): 336 """ 337 Connection.commit() did reset cursors, which made sqlite3 338 to return rows multiple times when fetched from cursors 339 after commit. See issues 10513 and 23129 for details. 340 """ 341 con = sqlite.connect(":memory:") 342 con.executescript(""" 343 create table t(c); 344 create table t2(c); 345 insert into t values(0); 346 insert into t values(1); 347 insert into t values(2); 348 """) 349 350 self.assertEqual(con.isolation_level, "") 351 352 counter = 0 353 for i, row in enumerate(con.execute("select c from t")): 354 with self.subTest(i=i, row=row): 355 con.execute("insert into t2(c) values (?)", (i,)) 356 con.commit() 357 if counter == 0: 358 self.assertEqual(row[0], 0) 359 elif counter == 1: 360 self.assertEqual(row[0], 1) 361 elif counter == 2: 362 self.assertEqual(row[0], 2) 363 counter += 1 364 self.assertEqual(counter, 3, "should have returned exactly three rows") 365 366 def CheckBpo31770(self): 367 """ 368 The interpreter shouldn't crash in case Cursor.__init__() is called 369 more than once. 370 """ 371 def callback(*args): 372 pass 373 con = sqlite.connect(":memory:") 374 cur = sqlite.Cursor(con) 375 ref = weakref.ref(cur, callback) 376 cur.__init__(con) 377 del cur 378 # The interpreter shouldn't crash when ref is collected. 379 del ref 380 support.gc_collect() 381 382 def CheckDelIsolation_levelSegfault(self): 383 with self.assertRaises(AttributeError): 384 del self.con.isolation_level 385 386 387class UnhashableFunc: 388 __hash__ = None 389 390 def __init__(self, return_value=None): 391 self.calls = 0 392 self.return_value = return_value 393 394 def __call__(self, *args, **kwargs): 395 self.calls += 1 396 return self.return_value 397 398 399class UnhashableCallbacksTestCase(unittest.TestCase): 400 """ 401 https://bugs.python.org/issue34052 402 403 Registering unhashable callbacks raises TypeError, callbacks are not 404 registered in SQLite after such registration attempt. 405 """ 406 def setUp(self): 407 self.con = sqlite.connect(':memory:') 408 409 def tearDown(self): 410 self.con.close() 411 412 def test_progress_handler(self): 413 f = UnhashableFunc(return_value=0) 414 with self.assertRaisesRegex(TypeError, 'unhashable type'): 415 self.con.set_progress_handler(f, 1) 416 self.con.execute('SELECT 1') 417 self.assertFalse(f.calls) 418 419 def test_func(self): 420 func_name = 'func_name' 421 f = UnhashableFunc() 422 with self.assertRaisesRegex(TypeError, 'unhashable type'): 423 self.con.create_function(func_name, 0, f) 424 msg = 'no such function: %s' % func_name 425 with self.assertRaisesRegex(sqlite.OperationalError, msg): 426 self.con.execute('SELECT %s()' % func_name) 427 self.assertFalse(f.calls) 428 429 def test_authorizer(self): 430 f = UnhashableFunc(return_value=sqlite.SQLITE_DENY) 431 with self.assertRaisesRegex(TypeError, 'unhashable type'): 432 self.con.set_authorizer(f) 433 self.con.execute('SELECT 1') 434 self.assertFalse(f.calls) 435 436 def test_aggr(self): 437 class UnhashableType(type): 438 __hash__ = None 439 aggr_name = 'aggr_name' 440 with self.assertRaisesRegex(TypeError, 'unhashable type'): 441 self.con.create_aggregate(aggr_name, 0, UnhashableType('Aggr', (), {})) 442 msg = 'no such function: %s' % aggr_name 443 with self.assertRaisesRegex(sqlite.OperationalError, msg): 444 self.con.execute('SELECT %s()' % aggr_name) 445 446 447def suite(): 448 regression_suite = unittest.makeSuite(RegressionTests, "Check") 449 return unittest.TestSuite(( 450 regression_suite, 451 unittest.makeSuite(UnhashableCallbacksTestCase), 452 )) 453 454def test(): 455 runner = unittest.TextTestRunner() 456 runner.run(suite()) 457 458if __name__ == "__main__": 459 test() 460