1#-*- coding: iso-8859-1 -*- 2# pysqlite2/test/dbapi.py: tests for DB-API compliance 3# 4# Copyright (C) 2004-2010 Gerhard H�ring <gh@ghaering.de> 5# 6# This file is part of pysqlite. 7# 8# This software is provided 'as-is', without any express or implied 9# warranty. In no event will the authors be held liable for any damages 10# arising from the use of this software. 11# 12# Permission is granted to anyone to use this software for any purpose, 13# including commercial applications, and to alter it and redistribute it 14# freely, subject to the following restrictions: 15# 16# 1. The origin of this software must not be misrepresented; you must not 17# claim that you wrote the original software. If you use this software 18# in a product, an acknowledgment in the product documentation would be 19# appreciated but is not required. 20# 2. Altered source versions must be plainly marked as such, and must not be 21# misrepresented as being the original software. 22# 3. This notice may not be removed or altered from any source distribution. 23 24import threading 25import unittest 26import sqlite3 as sqlite 27 28from test.support import TESTFN, unlink 29 30 31class ModuleTests(unittest.TestCase): 32 def CheckAPILevel(self): 33 self.assertEqual(sqlite.apilevel, "2.0", 34 "apilevel is %s, should be 2.0" % sqlite.apilevel) 35 36 def CheckThreadSafety(self): 37 self.assertEqual(sqlite.threadsafety, 1, 38 "threadsafety is %d, should be 1" % sqlite.threadsafety) 39 40 def CheckParamStyle(self): 41 self.assertEqual(sqlite.paramstyle, "qmark", 42 "paramstyle is '%s', should be 'qmark'" % 43 sqlite.paramstyle) 44 45 def CheckWarning(self): 46 self.assertTrue(issubclass(sqlite.Warning, Exception), 47 "Warning is not a subclass of Exception") 48 49 def CheckError(self): 50 self.assertTrue(issubclass(sqlite.Error, Exception), 51 "Error is not a subclass of Exception") 52 53 def CheckInterfaceError(self): 54 self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error), 55 "InterfaceError is not a subclass of Error") 56 57 def CheckDatabaseError(self): 58 self.assertTrue(issubclass(sqlite.DatabaseError, sqlite.Error), 59 "DatabaseError is not a subclass of Error") 60 61 def CheckDataError(self): 62 self.assertTrue(issubclass(sqlite.DataError, sqlite.DatabaseError), 63 "DataError is not a subclass of DatabaseError") 64 65 def CheckOperationalError(self): 66 self.assertTrue(issubclass(sqlite.OperationalError, sqlite.DatabaseError), 67 "OperationalError is not a subclass of DatabaseError") 68 69 def CheckIntegrityError(self): 70 self.assertTrue(issubclass(sqlite.IntegrityError, sqlite.DatabaseError), 71 "IntegrityError is not a subclass of DatabaseError") 72 73 def CheckInternalError(self): 74 self.assertTrue(issubclass(sqlite.InternalError, sqlite.DatabaseError), 75 "InternalError is not a subclass of DatabaseError") 76 77 def CheckProgrammingError(self): 78 self.assertTrue(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError), 79 "ProgrammingError is not a subclass of DatabaseError") 80 81 def CheckNotSupportedError(self): 82 self.assertTrue(issubclass(sqlite.NotSupportedError, 83 sqlite.DatabaseError), 84 "NotSupportedError is not a subclass of DatabaseError") 85 86class ConnectionTests(unittest.TestCase): 87 88 def setUp(self): 89 self.cx = sqlite.connect(":memory:") 90 cu = self.cx.cursor() 91 cu.execute("create table test(id integer primary key, name text)") 92 cu.execute("insert into test(name) values (?)", ("foo",)) 93 94 def tearDown(self): 95 self.cx.close() 96 97 def CheckCommit(self): 98 self.cx.commit() 99 100 def CheckCommitAfterNoChanges(self): 101 """ 102 A commit should also work when no changes were made to the database. 103 """ 104 self.cx.commit() 105 self.cx.commit() 106 107 def CheckRollback(self): 108 self.cx.rollback() 109 110 def CheckRollbackAfterNoChanges(self): 111 """ 112 A rollback should also work when no changes were made to the database. 113 """ 114 self.cx.rollback() 115 self.cx.rollback() 116 117 def CheckCursor(self): 118 cu = self.cx.cursor() 119 120 def CheckFailedOpen(self): 121 YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db" 122 with self.assertRaises(sqlite.OperationalError): 123 con = sqlite.connect(YOU_CANNOT_OPEN_THIS) 124 125 def CheckClose(self): 126 self.cx.close() 127 128 def CheckExceptions(self): 129 # Optional DB-API extension. 130 self.assertEqual(self.cx.Warning, sqlite.Warning) 131 self.assertEqual(self.cx.Error, sqlite.Error) 132 self.assertEqual(self.cx.InterfaceError, sqlite.InterfaceError) 133 self.assertEqual(self.cx.DatabaseError, sqlite.DatabaseError) 134 self.assertEqual(self.cx.DataError, sqlite.DataError) 135 self.assertEqual(self.cx.OperationalError, sqlite.OperationalError) 136 self.assertEqual(self.cx.IntegrityError, sqlite.IntegrityError) 137 self.assertEqual(self.cx.InternalError, sqlite.InternalError) 138 self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError) 139 self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError) 140 141 def CheckInTransaction(self): 142 # Can't use db from setUp because we want to test initial state. 143 cx = sqlite.connect(":memory:") 144 cu = cx.cursor() 145 self.assertEqual(cx.in_transaction, False) 146 cu.execute("create table transactiontest(id integer primary key, name text)") 147 self.assertEqual(cx.in_transaction, False) 148 cu.execute("insert into transactiontest(name) values (?)", ("foo",)) 149 self.assertEqual(cx.in_transaction, True) 150 cu.execute("select name from transactiontest where name=?", ["foo"]) 151 row = cu.fetchone() 152 self.assertEqual(cx.in_transaction, True) 153 cx.commit() 154 self.assertEqual(cx.in_transaction, False) 155 cu.execute("select name from transactiontest where name=?", ["foo"]) 156 row = cu.fetchone() 157 self.assertEqual(cx.in_transaction, False) 158 159 def CheckInTransactionRO(self): 160 with self.assertRaises(AttributeError): 161 self.cx.in_transaction = True 162 163 def CheckOpenWithPathLikeObject(self): 164 """ Checks that we can successfully connect to a database using an object that 165 is PathLike, i.e. has __fspath__(). """ 166 self.addCleanup(unlink, TESTFN) 167 class Path: 168 def __fspath__(self): 169 return TESTFN 170 path = Path() 171 with sqlite.connect(path) as cx: 172 cx.execute('create table test(id integer)') 173 174 def CheckOpenUri(self): 175 if sqlite.sqlite_version_info < (3, 7, 7): 176 with self.assertRaises(sqlite.NotSupportedError): 177 sqlite.connect(':memory:', uri=True) 178 return 179 self.addCleanup(unlink, TESTFN) 180 with sqlite.connect(TESTFN) as cx: 181 cx.execute('create table test(id integer)') 182 with sqlite.connect('file:' + TESTFN, uri=True) as cx: 183 cx.execute('insert into test(id) values(0)') 184 with sqlite.connect('file:' + TESTFN + '?mode=ro', uri=True) as cx: 185 with self.assertRaises(sqlite.OperationalError): 186 cx.execute('insert into test(id) values(1)') 187 188 @unittest.skipIf(sqlite.sqlite_version_info >= (3, 3, 1), 189 'needs sqlite versions older than 3.3.1') 190 def CheckSameThreadErrorOnOldVersion(self): 191 with self.assertRaises(sqlite.NotSupportedError) as cm: 192 sqlite.connect(':memory:', check_same_thread=False) 193 self.assertEqual(str(cm.exception), 'shared connections not available') 194 195class CursorTests(unittest.TestCase): 196 def setUp(self): 197 self.cx = sqlite.connect(":memory:") 198 self.cu = self.cx.cursor() 199 self.cu.execute( 200 "create table test(id integer primary key, name text, " 201 "income number, unique_test text unique)" 202 ) 203 self.cu.execute("insert into test(name) values (?)", ("foo",)) 204 205 def tearDown(self): 206 self.cu.close() 207 self.cx.close() 208 209 def CheckExecuteNoArgs(self): 210 self.cu.execute("delete from test") 211 212 def CheckExecuteIllegalSql(self): 213 with self.assertRaises(sqlite.OperationalError): 214 self.cu.execute("select asdf") 215 216 def CheckExecuteTooMuchSql(self): 217 with self.assertRaises(sqlite.Warning): 218 self.cu.execute("select 5+4; select 4+5") 219 220 def CheckExecuteTooMuchSql2(self): 221 self.cu.execute("select 5+4; -- foo bar") 222 223 def CheckExecuteTooMuchSql3(self): 224 self.cu.execute(""" 225 select 5+4; 226 227 /* 228 foo 229 */ 230 """) 231 232 def CheckExecuteWrongSqlArg(self): 233 with self.assertRaises(ValueError): 234 self.cu.execute(42) 235 236 def CheckExecuteArgInt(self): 237 self.cu.execute("insert into test(id) values (?)", (42,)) 238 239 def CheckExecuteArgFloat(self): 240 self.cu.execute("insert into test(income) values (?)", (2500.32,)) 241 242 def CheckExecuteArgString(self): 243 self.cu.execute("insert into test(name) values (?)", ("Hugo",)) 244 245 def CheckExecuteArgStringWithZeroByte(self): 246 self.cu.execute("insert into test(name) values (?)", ("Hu\x00go",)) 247 248 self.cu.execute("select name from test where id=?", (self.cu.lastrowid,)) 249 row = self.cu.fetchone() 250 self.assertEqual(row[0], "Hu\x00go") 251 252 def CheckExecuteNonIterable(self): 253 with self.assertRaises(ValueError) as cm: 254 self.cu.execute("insert into test(id) values (?)", 42) 255 self.assertEqual(str(cm.exception), 'parameters are of unsupported type') 256 257 def CheckExecuteWrongNoOfArgs1(self): 258 # too many parameters 259 with self.assertRaises(sqlite.ProgrammingError): 260 self.cu.execute("insert into test(id) values (?)", (17, "Egon")) 261 262 def CheckExecuteWrongNoOfArgs2(self): 263 # too little parameters 264 with self.assertRaises(sqlite.ProgrammingError): 265 self.cu.execute("insert into test(id) values (?)") 266 267 def CheckExecuteWrongNoOfArgs3(self): 268 # no parameters, parameters are needed 269 with self.assertRaises(sqlite.ProgrammingError): 270 self.cu.execute("insert into test(id) values (?)") 271 272 def CheckExecuteParamList(self): 273 self.cu.execute("insert into test(name) values ('foo')") 274 self.cu.execute("select name from test where name=?", ["foo"]) 275 row = self.cu.fetchone() 276 self.assertEqual(row[0], "foo") 277 278 def CheckExecuteParamSequence(self): 279 class L(object): 280 def __len__(self): 281 return 1 282 def __getitem__(self, x): 283 assert x == 0 284 return "foo" 285 286 self.cu.execute("insert into test(name) values ('foo')") 287 self.cu.execute("select name from test where name=?", L()) 288 row = self.cu.fetchone() 289 self.assertEqual(row[0], "foo") 290 291 def CheckExecuteDictMapping(self): 292 self.cu.execute("insert into test(name) values ('foo')") 293 self.cu.execute("select name from test where name=:name", {"name": "foo"}) 294 row = self.cu.fetchone() 295 self.assertEqual(row[0], "foo") 296 297 def CheckExecuteDictMapping_Mapping(self): 298 class D(dict): 299 def __missing__(self, key): 300 return "foo" 301 302 self.cu.execute("insert into test(name) values ('foo')") 303 self.cu.execute("select name from test where name=:name", D()) 304 row = self.cu.fetchone() 305 self.assertEqual(row[0], "foo") 306 307 def CheckExecuteDictMappingTooLittleArgs(self): 308 self.cu.execute("insert into test(name) values ('foo')") 309 with self.assertRaises(sqlite.ProgrammingError): 310 self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"}) 311 312 def CheckExecuteDictMappingNoArgs(self): 313 self.cu.execute("insert into test(name) values ('foo')") 314 with self.assertRaises(sqlite.ProgrammingError): 315 self.cu.execute("select name from test where name=:name") 316 317 def CheckExecuteDictMappingUnnamed(self): 318 self.cu.execute("insert into test(name) values ('foo')") 319 with self.assertRaises(sqlite.ProgrammingError): 320 self.cu.execute("select name from test where name=?", {"name": "foo"}) 321 322 def CheckClose(self): 323 self.cu.close() 324 325 def CheckRowcountExecute(self): 326 self.cu.execute("delete from test") 327 self.cu.execute("insert into test(name) values ('foo')") 328 self.cu.execute("insert into test(name) values ('foo')") 329 self.cu.execute("update test set name='bar'") 330 self.assertEqual(self.cu.rowcount, 2) 331 332 def CheckRowcountSelect(self): 333 """ 334 pysqlite does not know the rowcount of SELECT statements, because we 335 don't fetch all rows after executing the select statement. The rowcount 336 has thus to be -1. 337 """ 338 self.cu.execute("select 5 union select 6") 339 self.assertEqual(self.cu.rowcount, -1) 340 341 def CheckRowcountExecutemany(self): 342 self.cu.execute("delete from test") 343 self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)]) 344 self.assertEqual(self.cu.rowcount, 3) 345 346 def CheckTotalChanges(self): 347 self.cu.execute("insert into test(name) values ('foo')") 348 self.cu.execute("insert into test(name) values ('foo')") 349 self.assertLess(2, self.cx.total_changes, msg='total changes reported wrong value') 350 351 # Checks for executemany: 352 # Sequences are required by the DB-API, iterators 353 # enhancements in pysqlite. 354 355 def CheckExecuteManySequence(self): 356 self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)]) 357 358 def CheckExecuteManyIterator(self): 359 class MyIter: 360 def __init__(self): 361 self.value = 5 362 363 def __next__(self): 364 if self.value == 10: 365 raise StopIteration 366 else: 367 self.value += 1 368 return (self.value,) 369 370 self.cu.executemany("insert into test(income) values (?)", MyIter()) 371 372 def CheckExecuteManyGenerator(self): 373 def mygen(): 374 for i in range(5): 375 yield (i,) 376 377 self.cu.executemany("insert into test(income) values (?)", mygen()) 378 379 def CheckExecuteManyWrongSqlArg(self): 380 with self.assertRaises(ValueError): 381 self.cu.executemany(42, [(3,)]) 382 383 def CheckExecuteManySelect(self): 384 with self.assertRaises(sqlite.ProgrammingError): 385 self.cu.executemany("select ?", [(3,)]) 386 387 def CheckExecuteManyNotIterable(self): 388 with self.assertRaises(TypeError): 389 self.cu.executemany("insert into test(income) values (?)", 42) 390 391 def CheckFetchIter(self): 392 # Optional DB-API extension. 393 self.cu.execute("delete from test") 394 self.cu.execute("insert into test(id) values (?)", (5,)) 395 self.cu.execute("insert into test(id) values (?)", (6,)) 396 self.cu.execute("select id from test order by id") 397 lst = [] 398 for row in self.cu: 399 lst.append(row[0]) 400 self.assertEqual(lst[0], 5) 401 self.assertEqual(lst[1], 6) 402 403 def CheckFetchone(self): 404 self.cu.execute("select name from test") 405 row = self.cu.fetchone() 406 self.assertEqual(row[0], "foo") 407 row = self.cu.fetchone() 408 self.assertEqual(row, None) 409 410 def CheckFetchoneNoStatement(self): 411 cur = self.cx.cursor() 412 row = cur.fetchone() 413 self.assertEqual(row, None) 414 415 def CheckArraySize(self): 416 # must default ot 1 417 self.assertEqual(self.cu.arraysize, 1) 418 419 # now set to 2 420 self.cu.arraysize = 2 421 422 # now make the query return 3 rows 423 self.cu.execute("delete from test") 424 self.cu.execute("insert into test(name) values ('A')") 425 self.cu.execute("insert into test(name) values ('B')") 426 self.cu.execute("insert into test(name) values ('C')") 427 self.cu.execute("select name from test") 428 res = self.cu.fetchmany() 429 430 self.assertEqual(len(res), 2) 431 432 def CheckFetchmany(self): 433 self.cu.execute("select name from test") 434 res = self.cu.fetchmany(100) 435 self.assertEqual(len(res), 1) 436 res = self.cu.fetchmany(100) 437 self.assertEqual(res, []) 438 439 def CheckFetchmanyKwArg(self): 440 """Checks if fetchmany works with keyword arguments""" 441 self.cu.execute("select name from test") 442 res = self.cu.fetchmany(size=100) 443 self.assertEqual(len(res), 1) 444 445 def CheckFetchall(self): 446 self.cu.execute("select name from test") 447 res = self.cu.fetchall() 448 self.assertEqual(len(res), 1) 449 res = self.cu.fetchall() 450 self.assertEqual(res, []) 451 452 def CheckSetinputsizes(self): 453 self.cu.setinputsizes([3, 4, 5]) 454 455 def CheckSetoutputsize(self): 456 self.cu.setoutputsize(5, 0) 457 458 def CheckSetoutputsizeNoColumn(self): 459 self.cu.setoutputsize(42) 460 461 def CheckCursorConnection(self): 462 # Optional DB-API extension. 463 self.assertEqual(self.cu.connection, self.cx) 464 465 def CheckWrongCursorCallable(self): 466 with self.assertRaises(TypeError): 467 def f(): pass 468 cur = self.cx.cursor(f) 469 470 def CheckCursorWrongClass(self): 471 class Foo: pass 472 foo = Foo() 473 with self.assertRaises(TypeError): 474 cur = sqlite.Cursor(foo) 475 476 def CheckLastRowIDOnReplace(self): 477 """ 478 INSERT OR REPLACE and REPLACE INTO should produce the same behavior. 479 """ 480 sql = '{} INTO test(id, unique_test) VALUES (?, ?)' 481 for statement in ('INSERT OR REPLACE', 'REPLACE'): 482 with self.subTest(statement=statement): 483 self.cu.execute(sql.format(statement), (1, 'foo')) 484 self.assertEqual(self.cu.lastrowid, 1) 485 486 def CheckLastRowIDOnIgnore(self): 487 self.cu.execute( 488 "insert or ignore into test(unique_test) values (?)", 489 ('test',)) 490 self.assertEqual(self.cu.lastrowid, 2) 491 self.cu.execute( 492 "insert or ignore into test(unique_test) values (?)", 493 ('test',)) 494 self.assertEqual(self.cu.lastrowid, 2) 495 496 def CheckLastRowIDInsertOR(self): 497 results = [] 498 for statement in ('FAIL', 'ABORT', 'ROLLBACK'): 499 sql = 'INSERT OR {} INTO test(unique_test) VALUES (?)' 500 with self.subTest(statement='INSERT OR {}'.format(statement)): 501 self.cu.execute(sql.format(statement), (statement,)) 502 results.append((statement, self.cu.lastrowid)) 503 with self.assertRaises(sqlite.IntegrityError): 504 self.cu.execute(sql.format(statement), (statement,)) 505 results.append((statement, self.cu.lastrowid)) 506 expected = [ 507 ('FAIL', 2), ('FAIL', 2), 508 ('ABORT', 3), ('ABORT', 3), 509 ('ROLLBACK', 4), ('ROLLBACK', 4), 510 ] 511 self.assertEqual(results, expected) 512 513 514class ThreadTests(unittest.TestCase): 515 def setUp(self): 516 self.con = sqlite.connect(":memory:") 517 self.cur = self.con.cursor() 518 self.cur.execute("create table test(id integer primary key, name text, bin binary, ratio number, ts timestamp)") 519 520 def tearDown(self): 521 self.cur.close() 522 self.con.close() 523 524 def CheckConCursor(self): 525 def run(con, errors): 526 try: 527 cur = con.cursor() 528 errors.append("did not raise ProgrammingError") 529 return 530 except sqlite.ProgrammingError: 531 return 532 except: 533 errors.append("raised wrong exception") 534 535 errors = [] 536 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) 537 t.start() 538 t.join() 539 if len(errors) > 0: 540 self.fail("\n".join(errors)) 541 542 def CheckConCommit(self): 543 def run(con, errors): 544 try: 545 con.commit() 546 errors.append("did not raise ProgrammingError") 547 return 548 except sqlite.ProgrammingError: 549 return 550 except: 551 errors.append("raised wrong exception") 552 553 errors = [] 554 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) 555 t.start() 556 t.join() 557 if len(errors) > 0: 558 self.fail("\n".join(errors)) 559 560 def CheckConRollback(self): 561 def run(con, errors): 562 try: 563 con.rollback() 564 errors.append("did not raise ProgrammingError") 565 return 566 except sqlite.ProgrammingError: 567 return 568 except: 569 errors.append("raised wrong exception") 570 571 errors = [] 572 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) 573 t.start() 574 t.join() 575 if len(errors) > 0: 576 self.fail("\n".join(errors)) 577 578 def CheckConClose(self): 579 def run(con, errors): 580 try: 581 con.close() 582 errors.append("did not raise ProgrammingError") 583 return 584 except sqlite.ProgrammingError: 585 return 586 except: 587 errors.append("raised wrong exception") 588 589 errors = [] 590 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors}) 591 t.start() 592 t.join() 593 if len(errors) > 0: 594 self.fail("\n".join(errors)) 595 596 def CheckCurImplicitBegin(self): 597 def run(cur, errors): 598 try: 599 cur.execute("insert into test(name) values ('a')") 600 errors.append("did not raise ProgrammingError") 601 return 602 except sqlite.ProgrammingError: 603 return 604 except: 605 errors.append("raised wrong exception") 606 607 errors = [] 608 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) 609 t.start() 610 t.join() 611 if len(errors) > 0: 612 self.fail("\n".join(errors)) 613 614 def CheckCurClose(self): 615 def run(cur, errors): 616 try: 617 cur.close() 618 errors.append("did not raise ProgrammingError") 619 return 620 except sqlite.ProgrammingError: 621 return 622 except: 623 errors.append("raised wrong exception") 624 625 errors = [] 626 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) 627 t.start() 628 t.join() 629 if len(errors) > 0: 630 self.fail("\n".join(errors)) 631 632 def CheckCurExecute(self): 633 def run(cur, errors): 634 try: 635 cur.execute("select name from test") 636 errors.append("did not raise ProgrammingError") 637 return 638 except sqlite.ProgrammingError: 639 return 640 except: 641 errors.append("raised wrong exception") 642 643 errors = [] 644 self.cur.execute("insert into test(name) values ('a')") 645 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) 646 t.start() 647 t.join() 648 if len(errors) > 0: 649 self.fail("\n".join(errors)) 650 651 def CheckCurIterNext(self): 652 def run(cur, errors): 653 try: 654 row = cur.fetchone() 655 errors.append("did not raise ProgrammingError") 656 return 657 except sqlite.ProgrammingError: 658 return 659 except: 660 errors.append("raised wrong exception") 661 662 errors = [] 663 self.cur.execute("insert into test(name) values ('a')") 664 self.cur.execute("select name from test") 665 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors}) 666 t.start() 667 t.join() 668 if len(errors) > 0: 669 self.fail("\n".join(errors)) 670 671class ConstructorTests(unittest.TestCase): 672 def CheckDate(self): 673 d = sqlite.Date(2004, 10, 28) 674 675 def CheckTime(self): 676 t = sqlite.Time(12, 39, 35) 677 678 def CheckTimestamp(self): 679 ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35) 680 681 def CheckDateFromTicks(self): 682 d = sqlite.DateFromTicks(42) 683 684 def CheckTimeFromTicks(self): 685 t = sqlite.TimeFromTicks(42) 686 687 def CheckTimestampFromTicks(self): 688 ts = sqlite.TimestampFromTicks(42) 689 690 def CheckBinary(self): 691 b = sqlite.Binary(b"\0'") 692 693class ExtensionTests(unittest.TestCase): 694 def CheckScriptStringSql(self): 695 con = sqlite.connect(":memory:") 696 cur = con.cursor() 697 cur.executescript(""" 698 -- bla bla 699 /* a stupid comment */ 700 create table a(i); 701 insert into a(i) values (5); 702 """) 703 cur.execute("select i from a") 704 res = cur.fetchone()[0] 705 self.assertEqual(res, 5) 706 707 def CheckScriptSyntaxError(self): 708 con = sqlite.connect(":memory:") 709 cur = con.cursor() 710 with self.assertRaises(sqlite.OperationalError): 711 cur.executescript("create table test(x); asdf; create table test2(x)") 712 713 def CheckScriptErrorNormal(self): 714 con = sqlite.connect(":memory:") 715 cur = con.cursor() 716 with self.assertRaises(sqlite.OperationalError): 717 cur.executescript("create table test(sadfsadfdsa); select foo from hurz;") 718 719 def CheckCursorExecutescriptAsBytes(self): 720 con = sqlite.connect(":memory:") 721 cur = con.cursor() 722 with self.assertRaises(ValueError) as cm: 723 cur.executescript(b"create table test(foo); insert into test(foo) values (5);") 724 self.assertEqual(str(cm.exception), 'script argument must be unicode.') 725 726 def CheckConnectionExecute(self): 727 con = sqlite.connect(":memory:") 728 result = con.execute("select 5").fetchone()[0] 729 self.assertEqual(result, 5, "Basic test of Connection.execute") 730 731 def CheckConnectionExecutemany(self): 732 con = sqlite.connect(":memory:") 733 con.execute("create table test(foo)") 734 con.executemany("insert into test(foo) values (?)", [(3,), (4,)]) 735 result = con.execute("select foo from test order by foo").fetchall() 736 self.assertEqual(result[0][0], 3, "Basic test of Connection.executemany") 737 self.assertEqual(result[1][0], 4, "Basic test of Connection.executemany") 738 739 def CheckConnectionExecutescript(self): 740 con = sqlite.connect(":memory:") 741 con.executescript("create table test(foo); insert into test(foo) values (5);") 742 result = con.execute("select foo from test").fetchone()[0] 743 self.assertEqual(result, 5, "Basic test of Connection.executescript") 744 745class ClosedConTests(unittest.TestCase): 746 def CheckClosedConCursor(self): 747 con = sqlite.connect(":memory:") 748 con.close() 749 with self.assertRaises(sqlite.ProgrammingError): 750 cur = con.cursor() 751 752 def CheckClosedConCommit(self): 753 con = sqlite.connect(":memory:") 754 con.close() 755 with self.assertRaises(sqlite.ProgrammingError): 756 con.commit() 757 758 def CheckClosedConRollback(self): 759 con = sqlite.connect(":memory:") 760 con.close() 761 with self.assertRaises(sqlite.ProgrammingError): 762 con.rollback() 763 764 def CheckClosedCurExecute(self): 765 con = sqlite.connect(":memory:") 766 cur = con.cursor() 767 con.close() 768 with self.assertRaises(sqlite.ProgrammingError): 769 cur.execute("select 4") 770 771 def CheckClosedCreateFunction(self): 772 con = sqlite.connect(":memory:") 773 con.close() 774 def f(x): return 17 775 with self.assertRaises(sqlite.ProgrammingError): 776 con.create_function("foo", 1, f) 777 778 def CheckClosedCreateAggregate(self): 779 con = sqlite.connect(":memory:") 780 con.close() 781 class Agg: 782 def __init__(self): 783 pass 784 def step(self, x): 785 pass 786 def finalize(self): 787 return 17 788 with self.assertRaises(sqlite.ProgrammingError): 789 con.create_aggregate("foo", 1, Agg) 790 791 def CheckClosedSetAuthorizer(self): 792 con = sqlite.connect(":memory:") 793 con.close() 794 def authorizer(*args): 795 return sqlite.DENY 796 with self.assertRaises(sqlite.ProgrammingError): 797 con.set_authorizer(authorizer) 798 799 def CheckClosedSetProgressCallback(self): 800 con = sqlite.connect(":memory:") 801 con.close() 802 def progress(): pass 803 with self.assertRaises(sqlite.ProgrammingError): 804 con.set_progress_handler(progress, 100) 805 806 def CheckClosedCall(self): 807 con = sqlite.connect(":memory:") 808 con.close() 809 with self.assertRaises(sqlite.ProgrammingError): 810 con() 811 812class ClosedCurTests(unittest.TestCase): 813 def CheckClosed(self): 814 con = sqlite.connect(":memory:") 815 cur = con.cursor() 816 cur.close() 817 818 for method_name in ("execute", "executemany", "executescript", "fetchall", "fetchmany", "fetchone"): 819 if method_name in ("execute", "executescript"): 820 params = ("select 4 union select 5",) 821 elif method_name == "executemany": 822 params = ("insert into foo(bar) values (?)", [(3,), (4,)]) 823 else: 824 params = [] 825 826 with self.assertRaises(sqlite.ProgrammingError): 827 method = getattr(cur, method_name) 828 method(*params) 829 830 831class SqliteOnConflictTests(unittest.TestCase): 832 """ 833 Tests for SQLite's "insert on conflict" feature. 834 835 See https://www.sqlite.org/lang_conflict.html for details. 836 """ 837 838 def setUp(self): 839 self.cx = sqlite.connect(":memory:") 840 self.cu = self.cx.cursor() 841 self.cu.execute(""" 842 CREATE TABLE test( 843 id INTEGER PRIMARY KEY, name TEXT, unique_name TEXT UNIQUE 844 ); 845 """) 846 847 def tearDown(self): 848 self.cu.close() 849 self.cx.close() 850 851 def CheckOnConflictRollbackWithExplicitTransaction(self): 852 self.cx.isolation_level = None # autocommit mode 853 self.cu = self.cx.cursor() 854 # Start an explicit transaction. 855 self.cu.execute("BEGIN") 856 self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") 857 self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')") 858 with self.assertRaises(sqlite.IntegrityError): 859 self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')") 860 # Use connection to commit. 861 self.cx.commit() 862 self.cu.execute("SELECT name, unique_name from test") 863 # Transaction should have rolled back and nothing should be in table. 864 self.assertEqual(self.cu.fetchall(), []) 865 866 def CheckOnConflictAbortRaisesWithExplicitTransactions(self): 867 # Abort cancels the current sql statement but doesn't change anything 868 # about the current transaction. 869 self.cx.isolation_level = None # autocommit mode 870 self.cu = self.cx.cursor() 871 # Start an explicit transaction. 872 self.cu.execute("BEGIN") 873 self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") 874 self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')") 875 with self.assertRaises(sqlite.IntegrityError): 876 self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')") 877 self.cx.commit() 878 self.cu.execute("SELECT name, unique_name FROM test") 879 # Expect the first two inserts to work, third to do nothing. 880 self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)]) 881 882 def CheckOnConflictRollbackWithoutTransaction(self): 883 # Start of implicit transaction 884 self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") 885 self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')") 886 with self.assertRaises(sqlite.IntegrityError): 887 self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')") 888 self.cu.execute("SELECT name, unique_name FROM test") 889 # Implicit transaction is rolled back on error. 890 self.assertEqual(self.cu.fetchall(), []) 891 892 def CheckOnConflictAbortRaisesWithoutTransactions(self): 893 # Abort cancels the current sql statement but doesn't change anything 894 # about the current transaction. 895 self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") 896 self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')") 897 with self.assertRaises(sqlite.IntegrityError): 898 self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')") 899 # Make sure all other values were inserted. 900 self.cu.execute("SELECT name, unique_name FROM test") 901 self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)]) 902 903 def CheckOnConflictFail(self): 904 self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')") 905 with self.assertRaises(sqlite.IntegrityError): 906 self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')") 907 self.assertEqual(self.cu.fetchall(), []) 908 909 def CheckOnConflictIgnore(self): 910 self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')") 911 # Nothing should happen. 912 self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')") 913 self.cu.execute("SELECT unique_name FROM test") 914 self.assertEqual(self.cu.fetchall(), [('foo',)]) 915 916 def CheckOnConflictReplace(self): 917 self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Data!', 'foo')") 918 # There shouldn't be an IntegrityError exception. 919 self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Very different data!', 'foo')") 920 self.cu.execute("SELECT name, unique_name FROM test") 921 self.assertEqual(self.cu.fetchall(), [('Very different data!', 'foo')]) 922 923 924def suite(): 925 module_suite = unittest.makeSuite(ModuleTests, "Check") 926 connection_suite = unittest.makeSuite(ConnectionTests, "Check") 927 cursor_suite = unittest.makeSuite(CursorTests, "Check") 928 thread_suite = unittest.makeSuite(ThreadTests, "Check") 929 constructor_suite = unittest.makeSuite(ConstructorTests, "Check") 930 ext_suite = unittest.makeSuite(ExtensionTests, "Check") 931 closed_con_suite = unittest.makeSuite(ClosedConTests, "Check") 932 closed_cur_suite = unittest.makeSuite(ClosedCurTests, "Check") 933 on_conflict_suite = unittest.makeSuite(SqliteOnConflictTests, "Check") 934 return unittest.TestSuite(( 935 module_suite, connection_suite, cursor_suite, thread_suite, 936 constructor_suite, ext_suite, closed_con_suite, closed_cur_suite, 937 on_conflict_suite, 938 )) 939 940def test(): 941 runner = unittest.TextTestRunner() 942 runner.run(suite()) 943 944if __name__ == "__main__": 945 test() 946