1#-*- coding: iso-8859-1 -*-
2# pysqlite2/test/dbapi.py: tests for DB-API compliance
3#
4# Copyright (C) 2004-2010 Gerhard H�ring <gh@ghaering.de>
5#
6# This file is part of pysqlite.
7#
8# This software is provided 'as-is', without any express or implied
9# warranty.  In no event will the authors be held liable for any damages
10# arising from the use of this software.
11#
12# Permission is granted to anyone to use this software for any purpose,
13# including commercial applications, and to alter it and redistribute it
14# freely, subject to the following restrictions:
15#
16# 1. The origin of this software must not be misrepresented; you must not
17#    claim that you wrote the original software. If you use this software
18#    in a product, an acknowledgment in the product documentation would be
19#    appreciated but is not required.
20# 2. Altered source versions must be plainly marked as such, and must not be
21#    misrepresented as being the original software.
22# 3. This notice may not be removed or altered from any source distribution.
23
24import threading
25import unittest
26import sqlite3 as sqlite
27
28from test.support import TESTFN, unlink
29
30
31class ModuleTests(unittest.TestCase):
32    def CheckAPILevel(self):
33        self.assertEqual(sqlite.apilevel, "2.0",
34                         "apilevel is %s, should be 2.0" % sqlite.apilevel)
35
36    def CheckThreadSafety(self):
37        self.assertEqual(sqlite.threadsafety, 1,
38                         "threadsafety is %d, should be 1" % sqlite.threadsafety)
39
40    def CheckParamStyle(self):
41        self.assertEqual(sqlite.paramstyle, "qmark",
42                         "paramstyle is '%s', should be 'qmark'" %
43                         sqlite.paramstyle)
44
45    def CheckWarning(self):
46        self.assertTrue(issubclass(sqlite.Warning, Exception),
47                     "Warning is not a subclass of Exception")
48
49    def CheckError(self):
50        self.assertTrue(issubclass(sqlite.Error, Exception),
51                        "Error is not a subclass of Exception")
52
53    def CheckInterfaceError(self):
54        self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error),
55                        "InterfaceError is not a subclass of Error")
56
57    def CheckDatabaseError(self):
58        self.assertTrue(issubclass(sqlite.DatabaseError, sqlite.Error),
59                        "DatabaseError is not a subclass of Error")
60
61    def CheckDataError(self):
62        self.assertTrue(issubclass(sqlite.DataError, sqlite.DatabaseError),
63                        "DataError is not a subclass of DatabaseError")
64
65    def CheckOperationalError(self):
66        self.assertTrue(issubclass(sqlite.OperationalError, sqlite.DatabaseError),
67                        "OperationalError is not a subclass of DatabaseError")
68
69    def CheckIntegrityError(self):
70        self.assertTrue(issubclass(sqlite.IntegrityError, sqlite.DatabaseError),
71                        "IntegrityError is not a subclass of DatabaseError")
72
73    def CheckInternalError(self):
74        self.assertTrue(issubclass(sqlite.InternalError, sqlite.DatabaseError),
75                        "InternalError is not a subclass of DatabaseError")
76
77    def CheckProgrammingError(self):
78        self.assertTrue(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError),
79                        "ProgrammingError is not a subclass of DatabaseError")
80
81    def CheckNotSupportedError(self):
82        self.assertTrue(issubclass(sqlite.NotSupportedError,
83                                   sqlite.DatabaseError),
84                        "NotSupportedError is not a subclass of DatabaseError")
85
86class ConnectionTests(unittest.TestCase):
87
88    def setUp(self):
89        self.cx = sqlite.connect(":memory:")
90        cu = self.cx.cursor()
91        cu.execute("create table test(id integer primary key, name text)")
92        cu.execute("insert into test(name) values (?)", ("foo",))
93
94    def tearDown(self):
95        self.cx.close()
96
97    def CheckCommit(self):
98        self.cx.commit()
99
100    def CheckCommitAfterNoChanges(self):
101        """
102        A commit should also work when no changes were made to the database.
103        """
104        self.cx.commit()
105        self.cx.commit()
106
107    def CheckRollback(self):
108        self.cx.rollback()
109
110    def CheckRollbackAfterNoChanges(self):
111        """
112        A rollback should also work when no changes were made to the database.
113        """
114        self.cx.rollback()
115        self.cx.rollback()
116
117    def CheckCursor(self):
118        cu = self.cx.cursor()
119
120    def CheckFailedOpen(self):
121        YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db"
122        with self.assertRaises(sqlite.OperationalError):
123            con = sqlite.connect(YOU_CANNOT_OPEN_THIS)
124
125    def CheckClose(self):
126        self.cx.close()
127
128    def CheckExceptions(self):
129        # Optional DB-API extension.
130        self.assertEqual(self.cx.Warning, sqlite.Warning)
131        self.assertEqual(self.cx.Error, sqlite.Error)
132        self.assertEqual(self.cx.InterfaceError, sqlite.InterfaceError)
133        self.assertEqual(self.cx.DatabaseError, sqlite.DatabaseError)
134        self.assertEqual(self.cx.DataError, sqlite.DataError)
135        self.assertEqual(self.cx.OperationalError, sqlite.OperationalError)
136        self.assertEqual(self.cx.IntegrityError, sqlite.IntegrityError)
137        self.assertEqual(self.cx.InternalError, sqlite.InternalError)
138        self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError)
139        self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError)
140
141    def CheckInTransaction(self):
142        # Can't use db from setUp because we want to test initial state.
143        cx = sqlite.connect(":memory:")
144        cu = cx.cursor()
145        self.assertEqual(cx.in_transaction, False)
146        cu.execute("create table transactiontest(id integer primary key, name text)")
147        self.assertEqual(cx.in_transaction, False)
148        cu.execute("insert into transactiontest(name) values (?)", ("foo",))
149        self.assertEqual(cx.in_transaction, True)
150        cu.execute("select name from transactiontest where name=?", ["foo"])
151        row = cu.fetchone()
152        self.assertEqual(cx.in_transaction, True)
153        cx.commit()
154        self.assertEqual(cx.in_transaction, False)
155        cu.execute("select name from transactiontest where name=?", ["foo"])
156        row = cu.fetchone()
157        self.assertEqual(cx.in_transaction, False)
158
159    def CheckInTransactionRO(self):
160        with self.assertRaises(AttributeError):
161            self.cx.in_transaction = True
162
163    def CheckOpenWithPathLikeObject(self):
164        """ Checks that we can successfully connect to a database using an object that
165            is PathLike, i.e. has __fspath__(). """
166        self.addCleanup(unlink, TESTFN)
167        class Path:
168            def __fspath__(self):
169                return TESTFN
170        path = Path()
171        with sqlite.connect(path) as cx:
172            cx.execute('create table test(id integer)')
173
174    def CheckOpenUri(self):
175        if sqlite.sqlite_version_info < (3, 7, 7):
176            with self.assertRaises(sqlite.NotSupportedError):
177                sqlite.connect(':memory:', uri=True)
178            return
179        self.addCleanup(unlink, TESTFN)
180        with sqlite.connect(TESTFN) as cx:
181            cx.execute('create table test(id integer)')
182        with sqlite.connect('file:' + TESTFN, uri=True) as cx:
183            cx.execute('insert into test(id) values(0)')
184        with sqlite.connect('file:' + TESTFN + '?mode=ro', uri=True) as cx:
185            with self.assertRaises(sqlite.OperationalError):
186                cx.execute('insert into test(id) values(1)')
187
188    @unittest.skipIf(sqlite.sqlite_version_info >= (3, 3, 1),
189                     'needs sqlite versions older than 3.3.1')
190    def CheckSameThreadErrorOnOldVersion(self):
191        with self.assertRaises(sqlite.NotSupportedError) as cm:
192            sqlite.connect(':memory:', check_same_thread=False)
193        self.assertEqual(str(cm.exception), 'shared connections not available')
194
195class CursorTests(unittest.TestCase):
196    def setUp(self):
197        self.cx = sqlite.connect(":memory:")
198        self.cu = self.cx.cursor()
199        self.cu.execute(
200            "create table test(id integer primary key, name text, "
201            "income number, unique_test text unique)"
202        )
203        self.cu.execute("insert into test(name) values (?)", ("foo",))
204
205    def tearDown(self):
206        self.cu.close()
207        self.cx.close()
208
209    def CheckExecuteNoArgs(self):
210        self.cu.execute("delete from test")
211
212    def CheckExecuteIllegalSql(self):
213        with self.assertRaises(sqlite.OperationalError):
214            self.cu.execute("select asdf")
215
216    def CheckExecuteTooMuchSql(self):
217        with self.assertRaises(sqlite.Warning):
218            self.cu.execute("select 5+4; select 4+5")
219
220    def CheckExecuteTooMuchSql2(self):
221        self.cu.execute("select 5+4; -- foo bar")
222
223    def CheckExecuteTooMuchSql3(self):
224        self.cu.execute("""
225            select 5+4;
226
227            /*
228            foo
229            */
230            """)
231
232    def CheckExecuteWrongSqlArg(self):
233        with self.assertRaises(ValueError):
234            self.cu.execute(42)
235
236    def CheckExecuteArgInt(self):
237        self.cu.execute("insert into test(id) values (?)", (42,))
238
239    def CheckExecuteArgFloat(self):
240        self.cu.execute("insert into test(income) values (?)", (2500.32,))
241
242    def CheckExecuteArgString(self):
243        self.cu.execute("insert into test(name) values (?)", ("Hugo",))
244
245    def CheckExecuteArgStringWithZeroByte(self):
246        self.cu.execute("insert into test(name) values (?)", ("Hu\x00go",))
247
248        self.cu.execute("select name from test where id=?", (self.cu.lastrowid,))
249        row = self.cu.fetchone()
250        self.assertEqual(row[0], "Hu\x00go")
251
252    def CheckExecuteNonIterable(self):
253        with self.assertRaises(ValueError) as cm:
254            self.cu.execute("insert into test(id) values (?)", 42)
255        self.assertEqual(str(cm.exception), 'parameters are of unsupported type')
256
257    def CheckExecuteWrongNoOfArgs1(self):
258        # too many parameters
259        with self.assertRaises(sqlite.ProgrammingError):
260            self.cu.execute("insert into test(id) values (?)", (17, "Egon"))
261
262    def CheckExecuteWrongNoOfArgs2(self):
263        # too little parameters
264        with self.assertRaises(sqlite.ProgrammingError):
265            self.cu.execute("insert into test(id) values (?)")
266
267    def CheckExecuteWrongNoOfArgs3(self):
268        # no parameters, parameters are needed
269        with self.assertRaises(sqlite.ProgrammingError):
270            self.cu.execute("insert into test(id) values (?)")
271
272    def CheckExecuteParamList(self):
273        self.cu.execute("insert into test(name) values ('foo')")
274        self.cu.execute("select name from test where name=?", ["foo"])
275        row = self.cu.fetchone()
276        self.assertEqual(row[0], "foo")
277
278    def CheckExecuteParamSequence(self):
279        class L(object):
280            def __len__(self):
281                return 1
282            def __getitem__(self, x):
283                assert x == 0
284                return "foo"
285
286        self.cu.execute("insert into test(name) values ('foo')")
287        self.cu.execute("select name from test where name=?", L())
288        row = self.cu.fetchone()
289        self.assertEqual(row[0], "foo")
290
291    def CheckExecuteDictMapping(self):
292        self.cu.execute("insert into test(name) values ('foo')")
293        self.cu.execute("select name from test where name=:name", {"name": "foo"})
294        row = self.cu.fetchone()
295        self.assertEqual(row[0], "foo")
296
297    def CheckExecuteDictMapping_Mapping(self):
298        class D(dict):
299            def __missing__(self, key):
300                return "foo"
301
302        self.cu.execute("insert into test(name) values ('foo')")
303        self.cu.execute("select name from test where name=:name", D())
304        row = self.cu.fetchone()
305        self.assertEqual(row[0], "foo")
306
307    def CheckExecuteDictMappingTooLittleArgs(self):
308        self.cu.execute("insert into test(name) values ('foo')")
309        with self.assertRaises(sqlite.ProgrammingError):
310            self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"})
311
312    def CheckExecuteDictMappingNoArgs(self):
313        self.cu.execute("insert into test(name) values ('foo')")
314        with self.assertRaises(sqlite.ProgrammingError):
315            self.cu.execute("select name from test where name=:name")
316
317    def CheckExecuteDictMappingUnnamed(self):
318        self.cu.execute("insert into test(name) values ('foo')")
319        with self.assertRaises(sqlite.ProgrammingError):
320            self.cu.execute("select name from test where name=?", {"name": "foo"})
321
322    def CheckClose(self):
323        self.cu.close()
324
325    def CheckRowcountExecute(self):
326        self.cu.execute("delete from test")
327        self.cu.execute("insert into test(name) values ('foo')")
328        self.cu.execute("insert into test(name) values ('foo')")
329        self.cu.execute("update test set name='bar'")
330        self.assertEqual(self.cu.rowcount, 2)
331
332    def CheckRowcountSelect(self):
333        """
334        pysqlite does not know the rowcount of SELECT statements, because we
335        don't fetch all rows after executing the select statement. The rowcount
336        has thus to be -1.
337        """
338        self.cu.execute("select 5 union select 6")
339        self.assertEqual(self.cu.rowcount, -1)
340
341    def CheckRowcountExecutemany(self):
342        self.cu.execute("delete from test")
343        self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)])
344        self.assertEqual(self.cu.rowcount, 3)
345
346    def CheckTotalChanges(self):
347        self.cu.execute("insert into test(name) values ('foo')")
348        self.cu.execute("insert into test(name) values ('foo')")
349        self.assertLess(2, self.cx.total_changes, msg='total changes reported wrong value')
350
351    # Checks for executemany:
352    # Sequences are required by the DB-API, iterators
353    # enhancements in pysqlite.
354
355    def CheckExecuteManySequence(self):
356        self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)])
357
358    def CheckExecuteManyIterator(self):
359        class MyIter:
360            def __init__(self):
361                self.value = 5
362
363            def __next__(self):
364                if self.value == 10:
365                    raise StopIteration
366                else:
367                    self.value += 1
368                    return (self.value,)
369
370        self.cu.executemany("insert into test(income) values (?)", MyIter())
371
372    def CheckExecuteManyGenerator(self):
373        def mygen():
374            for i in range(5):
375                yield (i,)
376
377        self.cu.executemany("insert into test(income) values (?)", mygen())
378
379    def CheckExecuteManyWrongSqlArg(self):
380        with self.assertRaises(ValueError):
381            self.cu.executemany(42, [(3,)])
382
383    def CheckExecuteManySelect(self):
384        with self.assertRaises(sqlite.ProgrammingError):
385            self.cu.executemany("select ?", [(3,)])
386
387    def CheckExecuteManyNotIterable(self):
388        with self.assertRaises(TypeError):
389            self.cu.executemany("insert into test(income) values (?)", 42)
390
391    def CheckFetchIter(self):
392        # Optional DB-API extension.
393        self.cu.execute("delete from test")
394        self.cu.execute("insert into test(id) values (?)", (5,))
395        self.cu.execute("insert into test(id) values (?)", (6,))
396        self.cu.execute("select id from test order by id")
397        lst = []
398        for row in self.cu:
399            lst.append(row[0])
400        self.assertEqual(lst[0], 5)
401        self.assertEqual(lst[1], 6)
402
403    def CheckFetchone(self):
404        self.cu.execute("select name from test")
405        row = self.cu.fetchone()
406        self.assertEqual(row[0], "foo")
407        row = self.cu.fetchone()
408        self.assertEqual(row, None)
409
410    def CheckFetchoneNoStatement(self):
411        cur = self.cx.cursor()
412        row = cur.fetchone()
413        self.assertEqual(row, None)
414
415    def CheckArraySize(self):
416        # must default ot 1
417        self.assertEqual(self.cu.arraysize, 1)
418
419        # now set to 2
420        self.cu.arraysize = 2
421
422        # now make the query return 3 rows
423        self.cu.execute("delete from test")
424        self.cu.execute("insert into test(name) values ('A')")
425        self.cu.execute("insert into test(name) values ('B')")
426        self.cu.execute("insert into test(name) values ('C')")
427        self.cu.execute("select name from test")
428        res = self.cu.fetchmany()
429
430        self.assertEqual(len(res), 2)
431
432    def CheckFetchmany(self):
433        self.cu.execute("select name from test")
434        res = self.cu.fetchmany(100)
435        self.assertEqual(len(res), 1)
436        res = self.cu.fetchmany(100)
437        self.assertEqual(res, [])
438
439    def CheckFetchmanyKwArg(self):
440        """Checks if fetchmany works with keyword arguments"""
441        self.cu.execute("select name from test")
442        res = self.cu.fetchmany(size=100)
443        self.assertEqual(len(res), 1)
444
445    def CheckFetchall(self):
446        self.cu.execute("select name from test")
447        res = self.cu.fetchall()
448        self.assertEqual(len(res), 1)
449        res = self.cu.fetchall()
450        self.assertEqual(res, [])
451
452    def CheckSetinputsizes(self):
453        self.cu.setinputsizes([3, 4, 5])
454
455    def CheckSetoutputsize(self):
456        self.cu.setoutputsize(5, 0)
457
458    def CheckSetoutputsizeNoColumn(self):
459        self.cu.setoutputsize(42)
460
461    def CheckCursorConnection(self):
462        # Optional DB-API extension.
463        self.assertEqual(self.cu.connection, self.cx)
464
465    def CheckWrongCursorCallable(self):
466        with self.assertRaises(TypeError):
467            def f(): pass
468            cur = self.cx.cursor(f)
469
470    def CheckCursorWrongClass(self):
471        class Foo: pass
472        foo = Foo()
473        with self.assertRaises(TypeError):
474            cur = sqlite.Cursor(foo)
475
476    def CheckLastRowIDOnReplace(self):
477        """
478        INSERT OR REPLACE and REPLACE INTO should produce the same behavior.
479        """
480        sql = '{} INTO test(id, unique_test) VALUES (?, ?)'
481        for statement in ('INSERT OR REPLACE', 'REPLACE'):
482            with self.subTest(statement=statement):
483                self.cu.execute(sql.format(statement), (1, 'foo'))
484                self.assertEqual(self.cu.lastrowid, 1)
485
486    def CheckLastRowIDOnIgnore(self):
487        self.cu.execute(
488            "insert or ignore into test(unique_test) values (?)",
489            ('test',))
490        self.assertEqual(self.cu.lastrowid, 2)
491        self.cu.execute(
492            "insert or ignore into test(unique_test) values (?)",
493            ('test',))
494        self.assertEqual(self.cu.lastrowid, 2)
495
496    def CheckLastRowIDInsertOR(self):
497        results = []
498        for statement in ('FAIL', 'ABORT', 'ROLLBACK'):
499            sql = 'INSERT OR {} INTO test(unique_test) VALUES (?)'
500            with self.subTest(statement='INSERT OR {}'.format(statement)):
501                self.cu.execute(sql.format(statement), (statement,))
502                results.append((statement, self.cu.lastrowid))
503                with self.assertRaises(sqlite.IntegrityError):
504                    self.cu.execute(sql.format(statement), (statement,))
505                results.append((statement, self.cu.lastrowid))
506        expected = [
507            ('FAIL', 2), ('FAIL', 2),
508            ('ABORT', 3), ('ABORT', 3),
509            ('ROLLBACK', 4), ('ROLLBACK', 4),
510        ]
511        self.assertEqual(results, expected)
512
513
514class ThreadTests(unittest.TestCase):
515    def setUp(self):
516        self.con = sqlite.connect(":memory:")
517        self.cur = self.con.cursor()
518        self.cur.execute("create table test(id integer primary key, name text, bin binary, ratio number, ts timestamp)")
519
520    def tearDown(self):
521        self.cur.close()
522        self.con.close()
523
524    def CheckConCursor(self):
525        def run(con, errors):
526            try:
527                cur = con.cursor()
528                errors.append("did not raise ProgrammingError")
529                return
530            except sqlite.ProgrammingError:
531                return
532            except:
533                errors.append("raised wrong exception")
534
535        errors = []
536        t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
537        t.start()
538        t.join()
539        if len(errors) > 0:
540            self.fail("\n".join(errors))
541
542    def CheckConCommit(self):
543        def run(con, errors):
544            try:
545                con.commit()
546                errors.append("did not raise ProgrammingError")
547                return
548            except sqlite.ProgrammingError:
549                return
550            except:
551                errors.append("raised wrong exception")
552
553        errors = []
554        t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
555        t.start()
556        t.join()
557        if len(errors) > 0:
558            self.fail("\n".join(errors))
559
560    def CheckConRollback(self):
561        def run(con, errors):
562            try:
563                con.rollback()
564                errors.append("did not raise ProgrammingError")
565                return
566            except sqlite.ProgrammingError:
567                return
568            except:
569                errors.append("raised wrong exception")
570
571        errors = []
572        t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
573        t.start()
574        t.join()
575        if len(errors) > 0:
576            self.fail("\n".join(errors))
577
578    def CheckConClose(self):
579        def run(con, errors):
580            try:
581                con.close()
582                errors.append("did not raise ProgrammingError")
583                return
584            except sqlite.ProgrammingError:
585                return
586            except:
587                errors.append("raised wrong exception")
588
589        errors = []
590        t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
591        t.start()
592        t.join()
593        if len(errors) > 0:
594            self.fail("\n".join(errors))
595
596    def CheckCurImplicitBegin(self):
597        def run(cur, errors):
598            try:
599                cur.execute("insert into test(name) values ('a')")
600                errors.append("did not raise ProgrammingError")
601                return
602            except sqlite.ProgrammingError:
603                return
604            except:
605                errors.append("raised wrong exception")
606
607        errors = []
608        t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
609        t.start()
610        t.join()
611        if len(errors) > 0:
612            self.fail("\n".join(errors))
613
614    def CheckCurClose(self):
615        def run(cur, errors):
616            try:
617                cur.close()
618                errors.append("did not raise ProgrammingError")
619                return
620            except sqlite.ProgrammingError:
621                return
622            except:
623                errors.append("raised wrong exception")
624
625        errors = []
626        t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
627        t.start()
628        t.join()
629        if len(errors) > 0:
630            self.fail("\n".join(errors))
631
632    def CheckCurExecute(self):
633        def run(cur, errors):
634            try:
635                cur.execute("select name from test")
636                errors.append("did not raise ProgrammingError")
637                return
638            except sqlite.ProgrammingError:
639                return
640            except:
641                errors.append("raised wrong exception")
642
643        errors = []
644        self.cur.execute("insert into test(name) values ('a')")
645        t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
646        t.start()
647        t.join()
648        if len(errors) > 0:
649            self.fail("\n".join(errors))
650
651    def CheckCurIterNext(self):
652        def run(cur, errors):
653            try:
654                row = cur.fetchone()
655                errors.append("did not raise ProgrammingError")
656                return
657            except sqlite.ProgrammingError:
658                return
659            except:
660                errors.append("raised wrong exception")
661
662        errors = []
663        self.cur.execute("insert into test(name) values ('a')")
664        self.cur.execute("select name from test")
665        t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
666        t.start()
667        t.join()
668        if len(errors) > 0:
669            self.fail("\n".join(errors))
670
671class ConstructorTests(unittest.TestCase):
672    def CheckDate(self):
673        d = sqlite.Date(2004, 10, 28)
674
675    def CheckTime(self):
676        t = sqlite.Time(12, 39, 35)
677
678    def CheckTimestamp(self):
679        ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35)
680
681    def CheckDateFromTicks(self):
682        d = sqlite.DateFromTicks(42)
683
684    def CheckTimeFromTicks(self):
685        t = sqlite.TimeFromTicks(42)
686
687    def CheckTimestampFromTicks(self):
688        ts = sqlite.TimestampFromTicks(42)
689
690    def CheckBinary(self):
691        b = sqlite.Binary(b"\0'")
692
693class ExtensionTests(unittest.TestCase):
694    def CheckScriptStringSql(self):
695        con = sqlite.connect(":memory:")
696        cur = con.cursor()
697        cur.executescript("""
698            -- bla bla
699            /* a stupid comment */
700            create table a(i);
701            insert into a(i) values (5);
702            """)
703        cur.execute("select i from a")
704        res = cur.fetchone()[0]
705        self.assertEqual(res, 5)
706
707    def CheckScriptSyntaxError(self):
708        con = sqlite.connect(":memory:")
709        cur = con.cursor()
710        with self.assertRaises(sqlite.OperationalError):
711            cur.executescript("create table test(x); asdf; create table test2(x)")
712
713    def CheckScriptErrorNormal(self):
714        con = sqlite.connect(":memory:")
715        cur = con.cursor()
716        with self.assertRaises(sqlite.OperationalError):
717            cur.executescript("create table test(sadfsadfdsa); select foo from hurz;")
718
719    def CheckCursorExecutescriptAsBytes(self):
720        con = sqlite.connect(":memory:")
721        cur = con.cursor()
722        with self.assertRaises(ValueError) as cm:
723            cur.executescript(b"create table test(foo); insert into test(foo) values (5);")
724        self.assertEqual(str(cm.exception), 'script argument must be unicode.')
725
726    def CheckConnectionExecute(self):
727        con = sqlite.connect(":memory:")
728        result = con.execute("select 5").fetchone()[0]
729        self.assertEqual(result, 5, "Basic test of Connection.execute")
730
731    def CheckConnectionExecutemany(self):
732        con = sqlite.connect(":memory:")
733        con.execute("create table test(foo)")
734        con.executemany("insert into test(foo) values (?)", [(3,), (4,)])
735        result = con.execute("select foo from test order by foo").fetchall()
736        self.assertEqual(result[0][0], 3, "Basic test of Connection.executemany")
737        self.assertEqual(result[1][0], 4, "Basic test of Connection.executemany")
738
739    def CheckConnectionExecutescript(self):
740        con = sqlite.connect(":memory:")
741        con.executescript("create table test(foo); insert into test(foo) values (5);")
742        result = con.execute("select foo from test").fetchone()[0]
743        self.assertEqual(result, 5, "Basic test of Connection.executescript")
744
745class ClosedConTests(unittest.TestCase):
746    def CheckClosedConCursor(self):
747        con = sqlite.connect(":memory:")
748        con.close()
749        with self.assertRaises(sqlite.ProgrammingError):
750            cur = con.cursor()
751
752    def CheckClosedConCommit(self):
753        con = sqlite.connect(":memory:")
754        con.close()
755        with self.assertRaises(sqlite.ProgrammingError):
756            con.commit()
757
758    def CheckClosedConRollback(self):
759        con = sqlite.connect(":memory:")
760        con.close()
761        with self.assertRaises(sqlite.ProgrammingError):
762            con.rollback()
763
764    def CheckClosedCurExecute(self):
765        con = sqlite.connect(":memory:")
766        cur = con.cursor()
767        con.close()
768        with self.assertRaises(sqlite.ProgrammingError):
769            cur.execute("select 4")
770
771    def CheckClosedCreateFunction(self):
772        con = sqlite.connect(":memory:")
773        con.close()
774        def f(x): return 17
775        with self.assertRaises(sqlite.ProgrammingError):
776            con.create_function("foo", 1, f)
777
778    def CheckClosedCreateAggregate(self):
779        con = sqlite.connect(":memory:")
780        con.close()
781        class Agg:
782            def __init__(self):
783                pass
784            def step(self, x):
785                pass
786            def finalize(self):
787                return 17
788        with self.assertRaises(sqlite.ProgrammingError):
789            con.create_aggregate("foo", 1, Agg)
790
791    def CheckClosedSetAuthorizer(self):
792        con = sqlite.connect(":memory:")
793        con.close()
794        def authorizer(*args):
795            return sqlite.DENY
796        with self.assertRaises(sqlite.ProgrammingError):
797            con.set_authorizer(authorizer)
798
799    def CheckClosedSetProgressCallback(self):
800        con = sqlite.connect(":memory:")
801        con.close()
802        def progress(): pass
803        with self.assertRaises(sqlite.ProgrammingError):
804            con.set_progress_handler(progress, 100)
805
806    def CheckClosedCall(self):
807        con = sqlite.connect(":memory:")
808        con.close()
809        with self.assertRaises(sqlite.ProgrammingError):
810            con()
811
812class ClosedCurTests(unittest.TestCase):
813    def CheckClosed(self):
814        con = sqlite.connect(":memory:")
815        cur = con.cursor()
816        cur.close()
817
818        for method_name in ("execute", "executemany", "executescript", "fetchall", "fetchmany", "fetchone"):
819            if method_name in ("execute", "executescript"):
820                params = ("select 4 union select 5",)
821            elif method_name == "executemany":
822                params = ("insert into foo(bar) values (?)", [(3,), (4,)])
823            else:
824                params = []
825
826            with self.assertRaises(sqlite.ProgrammingError):
827                method = getattr(cur, method_name)
828                method(*params)
829
830
831class SqliteOnConflictTests(unittest.TestCase):
832    """
833    Tests for SQLite's "insert on conflict" feature.
834
835    See https://www.sqlite.org/lang_conflict.html for details.
836    """
837
838    def setUp(self):
839        self.cx = sqlite.connect(":memory:")
840        self.cu = self.cx.cursor()
841        self.cu.execute("""
842          CREATE TABLE test(
843            id INTEGER PRIMARY KEY, name TEXT, unique_name TEXT UNIQUE
844          );
845        """)
846
847    def tearDown(self):
848        self.cu.close()
849        self.cx.close()
850
851    def CheckOnConflictRollbackWithExplicitTransaction(self):
852        self.cx.isolation_level = None  # autocommit mode
853        self.cu = self.cx.cursor()
854        # Start an explicit transaction.
855        self.cu.execute("BEGIN")
856        self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
857        self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
858        with self.assertRaises(sqlite.IntegrityError):
859            self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
860        # Use connection to commit.
861        self.cx.commit()
862        self.cu.execute("SELECT name, unique_name from test")
863        # Transaction should have rolled back and nothing should be in table.
864        self.assertEqual(self.cu.fetchall(), [])
865
866    def CheckOnConflictAbortRaisesWithExplicitTransactions(self):
867        # Abort cancels the current sql statement but doesn't change anything
868        # about the current transaction.
869        self.cx.isolation_level = None  # autocommit mode
870        self.cu = self.cx.cursor()
871        # Start an explicit transaction.
872        self.cu.execute("BEGIN")
873        self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
874        self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
875        with self.assertRaises(sqlite.IntegrityError):
876            self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
877        self.cx.commit()
878        self.cu.execute("SELECT name, unique_name FROM test")
879        # Expect the first two inserts to work, third to do nothing.
880        self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)])
881
882    def CheckOnConflictRollbackWithoutTransaction(self):
883        # Start of implicit transaction
884        self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
885        self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
886        with self.assertRaises(sqlite.IntegrityError):
887            self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
888        self.cu.execute("SELECT name, unique_name FROM test")
889        # Implicit transaction is rolled back on error.
890        self.assertEqual(self.cu.fetchall(), [])
891
892    def CheckOnConflictAbortRaisesWithoutTransactions(self):
893        # Abort cancels the current sql statement but doesn't change anything
894        # about the current transaction.
895        self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
896        self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
897        with self.assertRaises(sqlite.IntegrityError):
898            self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
899        # Make sure all other values were inserted.
900        self.cu.execute("SELECT name, unique_name FROM test")
901        self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)])
902
903    def CheckOnConflictFail(self):
904        self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')")
905        with self.assertRaises(sqlite.IntegrityError):
906            self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')")
907        self.assertEqual(self.cu.fetchall(), [])
908
909    def CheckOnConflictIgnore(self):
910        self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')")
911        # Nothing should happen.
912        self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')")
913        self.cu.execute("SELECT unique_name FROM test")
914        self.assertEqual(self.cu.fetchall(), [('foo',)])
915
916    def CheckOnConflictReplace(self):
917        self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Data!', 'foo')")
918        # There shouldn't be an IntegrityError exception.
919        self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Very different data!', 'foo')")
920        self.cu.execute("SELECT name, unique_name FROM test")
921        self.assertEqual(self.cu.fetchall(), [('Very different data!', 'foo')])
922
923
924def suite():
925    module_suite = unittest.makeSuite(ModuleTests, "Check")
926    connection_suite = unittest.makeSuite(ConnectionTests, "Check")
927    cursor_suite = unittest.makeSuite(CursorTests, "Check")
928    thread_suite = unittest.makeSuite(ThreadTests, "Check")
929    constructor_suite = unittest.makeSuite(ConstructorTests, "Check")
930    ext_suite = unittest.makeSuite(ExtensionTests, "Check")
931    closed_con_suite = unittest.makeSuite(ClosedConTests, "Check")
932    closed_cur_suite = unittest.makeSuite(ClosedCurTests, "Check")
933    on_conflict_suite = unittest.makeSuite(SqliteOnConflictTests, "Check")
934    return unittest.TestSuite((
935        module_suite, connection_suite, cursor_suite, thread_suite,
936        constructor_suite, ext_suite, closed_con_suite, closed_cur_suite,
937        on_conflict_suite,
938    ))
939
940def test():
941    runner = unittest.TextTestRunner()
942    runner.run(suite())
943
944if __name__ == "__main__":
945    test()
946