1#-*- coding: ISO-8859-1 -*-
2# pysqlite2/test/dbapi.py: tests for DB-API compliance
3#
4# Copyright (C) 2004-2010 Gerhard H�ring <gh@ghaering.de>
5#
6# This file is part of pysqlite.
7#
8# This software is provided 'as-is', without any express or implied
9# warranty.  In no event will the authors be held liable for any damages
10# arising from the use of this software.
11#
12# Permission is granted to anyone to use this software for any purpose,
13# including commercial applications, and to alter it and redistribute it
14# freely, subject to the following restrictions:
15#
16# 1. The origin of this software must not be misrepresented; you must not
17#    claim that you wrote the original software. If you use this software
18#    in a product, an acknowledgment in the product documentation would be
19#    appreciated but is not required.
20# 2. Altered source versions must be plainly marked as such, and must not be
21#    misrepresented as being the original software.
22# 3. This notice may not be removed or altered from any source distribution.
23
24import unittest
25import sys
26import sqlite3 as sqlite
27from test import test_support
28try:
29    import threading
30except ImportError:
31    threading = None
32
33class ModuleTests(unittest.TestCase):
34    def CheckAPILevel(self):
35        self.assertEqual(sqlite.apilevel, "2.0",
36                         "apilevel is %s, should be 2.0" % sqlite.apilevel)
37
38    def CheckThreadSafety(self):
39        self.assertEqual(sqlite.threadsafety, 1,
40                         "threadsafety is %d, should be 1" % sqlite.threadsafety)
41
42    def CheckParamStyle(self):
43        self.assertEqual(sqlite.paramstyle, "qmark",
44                         "paramstyle is '%s', should be 'qmark'" %
45                         sqlite.paramstyle)
46
47    def CheckWarning(self):
48        self.assertTrue(issubclass(sqlite.Warning, StandardError),
49                        "Warning is not a subclass of StandardError")
50
51    def CheckError(self):
52        self.assertTrue(issubclass(sqlite.Error, StandardError),
53                        "Error is not a subclass of StandardError")
54
55    def CheckInterfaceError(self):
56        self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error),
57                        "InterfaceError is not a subclass of Error")
58
59    def CheckDatabaseError(self):
60        self.assertTrue(issubclass(sqlite.DatabaseError, sqlite.Error),
61                        "DatabaseError is not a subclass of Error")
62
63    def CheckDataError(self):
64        self.assertTrue(issubclass(sqlite.DataError, sqlite.DatabaseError),
65                        "DataError is not a subclass of DatabaseError")
66
67    def CheckOperationalError(self):
68        self.assertTrue(issubclass(sqlite.OperationalError, sqlite.DatabaseError),
69                        "OperationalError is not a subclass of DatabaseError")
70
71    def CheckIntegrityError(self):
72        self.assertTrue(issubclass(sqlite.IntegrityError, sqlite.DatabaseError),
73                        "IntegrityError is not a subclass of DatabaseError")
74
75    def CheckInternalError(self):
76        self.assertTrue(issubclass(sqlite.InternalError, sqlite.DatabaseError),
77                        "InternalError is not a subclass of DatabaseError")
78
79    def CheckProgrammingError(self):
80        self.assertTrue(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError),
81                        "ProgrammingError is not a subclass of DatabaseError")
82
83    def CheckNotSupportedError(self):
84        self.assertTrue(issubclass(sqlite.NotSupportedError,
85                                   sqlite.DatabaseError),
86                        "NotSupportedError is not a subclass of DatabaseError")
87
88class ConnectionTests(unittest.TestCase):
89    def setUp(self):
90        self.cx = sqlite.connect(":memory:")
91        cu = self.cx.cursor()
92        cu.execute("create table test(id integer primary key, name text)")
93        cu.execute("insert into test(name) values (?)", ("foo",))
94
95    def tearDown(self):
96        self.cx.close()
97
98    def CheckCommit(self):
99        self.cx.commit()
100
101    def CheckCommitAfterNoChanges(self):
102        """
103        A commit should also work when no changes were made to the database.
104        """
105        self.cx.commit()
106        self.cx.commit()
107
108    def CheckRollback(self):
109        self.cx.rollback()
110
111    def CheckRollbackAfterNoChanges(self):
112        """
113        A rollback should also work when no changes were made to the database.
114        """
115        self.cx.rollback()
116        self.cx.rollback()
117
118    def CheckCursor(self):
119        cu = self.cx.cursor()
120
121    def CheckFailedOpen(self):
122        YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db"
123        try:
124            con = sqlite.connect(YOU_CANNOT_OPEN_THIS)
125        except sqlite.OperationalError:
126            return
127        self.fail("should have raised an OperationalError")
128
129    def CheckClose(self):
130        self.cx.close()
131
132    def CheckExceptions(self):
133        # Optional DB-API extension.
134        self.assertEqual(self.cx.Warning, sqlite.Warning)
135        self.assertEqual(self.cx.Error, sqlite.Error)
136        self.assertEqual(self.cx.InterfaceError, sqlite.InterfaceError)
137        self.assertEqual(self.cx.DatabaseError, sqlite.DatabaseError)
138        self.assertEqual(self.cx.DataError, sqlite.DataError)
139        self.assertEqual(self.cx.OperationalError, sqlite.OperationalError)
140        self.assertEqual(self.cx.IntegrityError, sqlite.IntegrityError)
141        self.assertEqual(self.cx.InternalError, sqlite.InternalError)
142        self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError)
143        self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError)
144
145class CursorTests(unittest.TestCase):
146    def setUp(self):
147        self.cx = sqlite.connect(":memory:")
148        self.cu = self.cx.cursor()
149        self.cu.execute("create table test(id integer primary key, name text, income number)")
150        self.cu.execute("insert into test(name) values (?)", ("foo",))
151
152    def tearDown(self):
153        self.cu.close()
154        self.cx.close()
155
156    def CheckExecuteNoArgs(self):
157        self.cu.execute("delete from test")
158
159    def CheckExecuteIllegalSql(self):
160        try:
161            self.cu.execute("select asdf")
162            self.fail("should have raised an OperationalError")
163        except sqlite.OperationalError:
164            return
165        except:
166            self.fail("raised wrong exception")
167
168    def CheckExecuteTooMuchSql(self):
169        try:
170            self.cu.execute("select 5+4; select 4+5")
171            self.fail("should have raised a Warning")
172        except sqlite.Warning:
173            return
174        except:
175            self.fail("raised wrong exception")
176
177    def CheckExecuteTooMuchSql2(self):
178        self.cu.execute("select 5+4; -- foo bar")
179
180    def CheckExecuteTooMuchSql3(self):
181        self.cu.execute("""
182            select 5+4;
183
184            /*
185            foo
186            */
187            """)
188
189    def CheckExecuteWrongSqlArg(self):
190        try:
191            self.cu.execute(42)
192            self.fail("should have raised a ValueError")
193        except ValueError:
194            return
195        except:
196            self.fail("raised wrong exception.")
197
198    def CheckExecuteArgInt(self):
199        self.cu.execute("insert into test(id) values (?)", (42,))
200
201    def CheckExecuteArgFloat(self):
202        self.cu.execute("insert into test(income) values (?)", (2500.32,))
203
204    def CheckExecuteArgString(self):
205        self.cu.execute("insert into test(name) values (?)", ("Hugo",))
206
207    def CheckExecuteArgStringWithZeroByte(self):
208        self.cu.execute("insert into test(name) values (?)", ("Hu\x00go",))
209
210        self.cu.execute("select name from test where id=?", (self.cu.lastrowid,))
211        row = self.cu.fetchone()
212        self.assertEqual(row[0], "Hu\x00go")
213
214    def CheckExecuteWrongNoOfArgs1(self):
215        # too many parameters
216        try:
217            self.cu.execute("insert into test(id) values (?)", (17, "Egon"))
218            self.fail("should have raised ProgrammingError")
219        except sqlite.ProgrammingError:
220            pass
221
222    def CheckExecuteWrongNoOfArgs2(self):
223        # too little parameters
224        try:
225            self.cu.execute("insert into test(id) values (?)")
226            self.fail("should have raised ProgrammingError")
227        except sqlite.ProgrammingError:
228            pass
229
230    def CheckExecuteWrongNoOfArgs3(self):
231        # no parameters, parameters are needed
232        try:
233            self.cu.execute("insert into test(id) values (?)")
234            self.fail("should have raised ProgrammingError")
235        except sqlite.ProgrammingError:
236            pass
237
238    def CheckExecuteParamList(self):
239        self.cu.execute("insert into test(name) values ('foo')")
240        self.cu.execute("select name from test where name=?", ["foo"])
241        row = self.cu.fetchone()
242        self.assertEqual(row[0], "foo")
243
244    def CheckExecuteParamSequence(self):
245        class L(object):
246            def __len__(self):
247                return 1
248            def __getitem__(self, x):
249                assert x == 0
250                return "foo"
251
252        self.cu.execute("insert into test(name) values ('foo')")
253        self.cu.execute("select name from test where name=?", L())
254        row = self.cu.fetchone()
255        self.assertEqual(row[0], "foo")
256
257    def CheckExecuteDictMapping(self):
258        self.cu.execute("insert into test(name) values ('foo')")
259        self.cu.execute("select name from test where name=:name", {"name": "foo"})
260        row = self.cu.fetchone()
261        self.assertEqual(row[0], "foo")
262
263    def CheckExecuteDictMapping_Mapping(self):
264        # Test only works with Python 2.5 or later
265        if sys.version_info < (2, 5, 0):
266            return
267
268        class D(dict):
269            def __missing__(self, key):
270                return "foo"
271
272        self.cu.execute("insert into test(name) values ('foo')")
273        self.cu.execute("select name from test where name=:name", D())
274        row = self.cu.fetchone()
275        self.assertEqual(row[0], "foo")
276
277    def CheckExecuteDictMappingTooLittleArgs(self):
278        self.cu.execute("insert into test(name) values ('foo')")
279        try:
280            self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"})
281            self.fail("should have raised ProgrammingError")
282        except sqlite.ProgrammingError:
283            pass
284
285    def CheckExecuteDictMappingNoArgs(self):
286        self.cu.execute("insert into test(name) values ('foo')")
287        try:
288            self.cu.execute("select name from test where name=:name")
289            self.fail("should have raised ProgrammingError")
290        except sqlite.ProgrammingError:
291            pass
292
293    def CheckExecuteDictMappingUnnamed(self):
294        self.cu.execute("insert into test(name) values ('foo')")
295        try:
296            self.cu.execute("select name from test where name=?", {"name": "foo"})
297            self.fail("should have raised ProgrammingError")
298        except sqlite.ProgrammingError:
299            pass
300
301    def CheckClose(self):
302        self.cu.close()
303
304    def CheckRowcountExecute(self):
305        self.cu.execute("delete from test")
306        self.cu.execute("insert into test(name) values ('foo')")
307        self.cu.execute("insert into test(name) values ('foo')")
308        self.cu.execute("update test set name='bar'")
309        self.assertEqual(self.cu.rowcount, 2)
310
311    def CheckRowcountSelect(self):
312        """
313        pysqlite does not know the rowcount of SELECT statements, because we
314        don't fetch all rows after executing the select statement. The rowcount
315        has thus to be -1.
316        """
317        self.cu.execute("select 5 union select 6")
318        self.assertEqual(self.cu.rowcount, -1)
319
320    def CheckRowcountExecutemany(self):
321        self.cu.execute("delete from test")
322        self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)])
323        self.assertEqual(self.cu.rowcount, 3)
324
325    def CheckTotalChanges(self):
326        self.cu.execute("insert into test(name) values ('foo')")
327        self.cu.execute("insert into test(name) values ('foo')")
328        if self.cx.total_changes < 2:
329            self.fail("total changes reported wrong value")
330
331    # Checks for executemany:
332    # Sequences are required by the DB-API, iterators
333    # enhancements in pysqlite.
334
335    def CheckExecuteManySequence(self):
336        self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)])
337
338    def CheckExecuteManyIterator(self):
339        class MyIter:
340            def __init__(self):
341                self.value = 5
342
343            def next(self):
344                if self.value == 10:
345                    raise StopIteration
346                else:
347                    self.value += 1
348                    return (self.value,)
349
350        self.cu.executemany("insert into test(income) values (?)", MyIter())
351
352    def CheckExecuteManyGenerator(self):
353        def mygen():
354            for i in range(5):
355                yield (i,)
356
357        self.cu.executemany("insert into test(income) values (?)", mygen())
358
359    def CheckExecuteManyWrongSqlArg(self):
360        try:
361            self.cu.executemany(42, [(3,)])
362            self.fail("should have raised a ValueError")
363        except ValueError:
364            return
365        except:
366            self.fail("raised wrong exception.")
367
368    def CheckExecuteManySelect(self):
369        try:
370            self.cu.executemany("select ?", [(3,)])
371            self.fail("should have raised a ProgrammingError")
372        except sqlite.ProgrammingError:
373            return
374        except:
375            self.fail("raised wrong exception.")
376
377    def CheckExecuteManyNotIterable(self):
378        try:
379            self.cu.executemany("insert into test(income) values (?)", 42)
380            self.fail("should have raised a TypeError")
381        except TypeError:
382            return
383        except Exception, e:
384            print "raised", e.__class__
385            self.fail("raised wrong exception.")
386
387    def CheckFetchIter(self):
388        # Optional DB-API extension.
389        self.cu.execute("delete from test")
390        self.cu.execute("insert into test(id) values (?)", (5,))
391        self.cu.execute("insert into test(id) values (?)", (6,))
392        self.cu.execute("select id from test order by id")
393        lst = []
394        for row in self.cu:
395            lst.append(row[0])
396        self.assertEqual(lst[0], 5)
397        self.assertEqual(lst[1], 6)
398
399    def CheckFetchone(self):
400        self.cu.execute("select name from test")
401        row = self.cu.fetchone()
402        self.assertEqual(row[0], "foo")
403        row = self.cu.fetchone()
404        self.assertEqual(row, None)
405
406    def CheckFetchoneNoStatement(self):
407        cur = self.cx.cursor()
408        row = cur.fetchone()
409        self.assertEqual(row, None)
410
411    def CheckArraySize(self):
412        # must default ot 1
413        self.assertEqual(self.cu.arraysize, 1)
414
415        # now set to 2
416        self.cu.arraysize = 2
417
418        # now make the query return 3 rows
419        self.cu.execute("delete from test")
420        self.cu.execute("insert into test(name) values ('A')")
421        self.cu.execute("insert into test(name) values ('B')")
422        self.cu.execute("insert into test(name) values ('C')")
423        self.cu.execute("select name from test")
424        res = self.cu.fetchmany()
425
426        self.assertEqual(len(res), 2)
427
428    def CheckFetchmany(self):
429        self.cu.execute("select name from test")
430        res = self.cu.fetchmany(100)
431        self.assertEqual(len(res), 1)
432        res = self.cu.fetchmany(100)
433        self.assertEqual(res, [])
434
435    def CheckFetchmanyKwArg(self):
436        """Checks if fetchmany works with keyword arguments"""
437        self.cu.execute("select name from test")
438        res = self.cu.fetchmany(size=100)
439        self.assertEqual(len(res), 1)
440
441    def CheckFetchall(self):
442        self.cu.execute("select name from test")
443        res = self.cu.fetchall()
444        self.assertEqual(len(res), 1)
445        res = self.cu.fetchall()
446        self.assertEqual(res, [])
447
448    def CheckSetinputsizes(self):
449        self.cu.setinputsizes([3, 4, 5])
450
451    def CheckSetoutputsize(self):
452        self.cu.setoutputsize(5, 0)
453
454    def CheckSetoutputsizeNoColumn(self):
455        self.cu.setoutputsize(42)
456
457    def CheckCursorConnection(self):
458        # Optional DB-API extension.
459        self.assertEqual(self.cu.connection, self.cx)
460
461    def CheckWrongCursorCallable(self):
462        try:
463            def f(): pass
464            cur = self.cx.cursor(f)
465            self.fail("should have raised a TypeError")
466        except TypeError:
467            return
468        self.fail("should have raised a ValueError")
469
470    def CheckCursorWrongClass(self):
471        class Foo: pass
472        foo = Foo()
473        try:
474            cur = sqlite.Cursor(foo)
475            self.fail("should have raised a ValueError")
476        except TypeError:
477            pass
478
479@unittest.skipUnless(threading, 'This test requires threading.')
480class ThreadTests(unittest.TestCase):
481    def setUp(self):
482        self.con = sqlite.connect(":memory:")
483        self.cur = self.con.cursor()
484        self.cur.execute("create table test(id integer primary key, name text, bin binary, ratio number, ts timestamp)")
485
486    def tearDown(self):
487        self.cur.close()
488        self.con.close()
489
490    def CheckConCursor(self):
491        def run(con, errors):
492            try:
493                cur = con.cursor()
494                errors.append("did not raise ProgrammingError")
495                return
496            except sqlite.ProgrammingError:
497                return
498            except:
499                errors.append("raised wrong exception")
500
501        errors = []
502        t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
503        t.start()
504        t.join()
505        if len(errors) > 0:
506            self.fail("\n".join(errors))
507
508    def CheckConCommit(self):
509        def run(con, errors):
510            try:
511                con.commit()
512                errors.append("did not raise ProgrammingError")
513                return
514            except sqlite.ProgrammingError:
515                return
516            except:
517                errors.append("raised wrong exception")
518
519        errors = []
520        t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
521        t.start()
522        t.join()
523        if len(errors) > 0:
524            self.fail("\n".join(errors))
525
526    def CheckConRollback(self):
527        def run(con, errors):
528            try:
529                con.rollback()
530                errors.append("did not raise ProgrammingError")
531                return
532            except sqlite.ProgrammingError:
533                return
534            except:
535                errors.append("raised wrong exception")
536
537        errors = []
538        t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
539        t.start()
540        t.join()
541        if len(errors) > 0:
542            self.fail("\n".join(errors))
543
544    def CheckConClose(self):
545        def run(con, errors):
546            try:
547                con.close()
548                errors.append("did not raise ProgrammingError")
549                return
550            except sqlite.ProgrammingError:
551                return
552            except:
553                errors.append("raised wrong exception")
554
555        errors = []
556        t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
557        t.start()
558        t.join()
559        if len(errors) > 0:
560            self.fail("\n".join(errors))
561
562    def CheckCurImplicitBegin(self):
563        def run(cur, errors):
564            try:
565                cur.execute("insert into test(name) values ('a')")
566                errors.append("did not raise ProgrammingError")
567                return
568            except sqlite.ProgrammingError:
569                return
570            except:
571                errors.append("raised wrong exception")
572
573        errors = []
574        t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
575        t.start()
576        t.join()
577        if len(errors) > 0:
578            self.fail("\n".join(errors))
579
580    def CheckCurClose(self):
581        def run(cur, errors):
582            try:
583                cur.close()
584                errors.append("did not raise ProgrammingError")
585                return
586            except sqlite.ProgrammingError:
587                return
588            except:
589                errors.append("raised wrong exception")
590
591        errors = []
592        t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
593        t.start()
594        t.join()
595        if len(errors) > 0:
596            self.fail("\n".join(errors))
597
598    def CheckCurExecute(self):
599        def run(cur, errors):
600            try:
601                cur.execute("select name from test")
602                errors.append("did not raise ProgrammingError")
603                return
604            except sqlite.ProgrammingError:
605                return
606            except:
607                errors.append("raised wrong exception")
608
609        errors = []
610        self.cur.execute("insert into test(name) values ('a')")
611        t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
612        t.start()
613        t.join()
614        if len(errors) > 0:
615            self.fail("\n".join(errors))
616
617    def CheckCurIterNext(self):
618        def run(cur, errors):
619            try:
620                row = cur.fetchone()
621                errors.append("did not raise ProgrammingError")
622                return
623            except sqlite.ProgrammingError:
624                return
625            except:
626                errors.append("raised wrong exception")
627
628        errors = []
629        self.cur.execute("insert into test(name) values ('a')")
630        self.cur.execute("select name from test")
631        t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
632        t.start()
633        t.join()
634        if len(errors) > 0:
635            self.fail("\n".join(errors))
636
637class ConstructorTests(unittest.TestCase):
638    def CheckDate(self):
639        d = sqlite.Date(2004, 10, 28)
640
641    def CheckTime(self):
642        t = sqlite.Time(12, 39, 35)
643
644    def CheckTimestamp(self):
645        ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35)
646
647    def CheckDateFromTicks(self):
648        d = sqlite.DateFromTicks(42)
649
650    def CheckTimeFromTicks(self):
651        t = sqlite.TimeFromTicks(42)
652
653    def CheckTimestampFromTicks(self):
654        ts = sqlite.TimestampFromTicks(42)
655
656    def CheckBinary(self):
657        with test_support.check_py3k_warnings():
658            b = sqlite.Binary(chr(0) + "'")
659
660class ExtensionTests(unittest.TestCase):
661    def CheckScriptStringSql(self):
662        con = sqlite.connect(":memory:")
663        cur = con.cursor()
664        cur.executescript("""
665            -- bla bla
666            /* a stupid comment */
667            create table a(i);
668            insert into a(i) values (5);
669            """)
670        cur.execute("select i from a")
671        res = cur.fetchone()[0]
672        self.assertEqual(res, 5)
673
674    def CheckScriptStringUnicode(self):
675        con = sqlite.connect(":memory:")
676        cur = con.cursor()
677        cur.executescript(u"""
678            create table a(i);
679            insert into a(i) values (5);
680            select i from a;
681            delete from a;
682            insert into a(i) values (6);
683            """)
684        cur.execute("select i from a")
685        res = cur.fetchone()[0]
686        self.assertEqual(res, 6)
687
688    def CheckScriptSyntaxError(self):
689        con = sqlite.connect(":memory:")
690        cur = con.cursor()
691        raised = False
692        try:
693            cur.executescript("create table test(x); asdf; create table test2(x)")
694        except sqlite.OperationalError:
695            raised = True
696        self.assertEqual(raised, True, "should have raised an exception")
697
698    def CheckScriptErrorNormal(self):
699        con = sqlite.connect(":memory:")
700        cur = con.cursor()
701        raised = False
702        try:
703            cur.executescript("create table test(sadfsadfdsa); select foo from hurz;")
704        except sqlite.OperationalError:
705            raised = True
706        self.assertEqual(raised, True, "should have raised an exception")
707
708    def CheckConnectionExecute(self):
709        con = sqlite.connect(":memory:")
710        result = con.execute("select 5").fetchone()[0]
711        self.assertEqual(result, 5, "Basic test of Connection.execute")
712
713    def CheckConnectionExecutemany(self):
714        con = sqlite.connect(":memory:")
715        con.execute("create table test(foo)")
716        con.executemany("insert into test(foo) values (?)", [(3,), (4,)])
717        result = con.execute("select foo from test order by foo").fetchall()
718        self.assertEqual(result[0][0], 3, "Basic test of Connection.executemany")
719        self.assertEqual(result[1][0], 4, "Basic test of Connection.executemany")
720
721    def CheckConnectionExecutescript(self):
722        con = sqlite.connect(":memory:")
723        con.executescript("create table test(foo); insert into test(foo) values (5);")
724        result = con.execute("select foo from test").fetchone()[0]
725        self.assertEqual(result, 5, "Basic test of Connection.executescript")
726
727class ClosedConTests(unittest.TestCase):
728    def setUp(self):
729        pass
730
731    def tearDown(self):
732        pass
733
734    def CheckClosedConCursor(self):
735        con = sqlite.connect(":memory:")
736        con.close()
737        try:
738            cur = con.cursor()
739            self.fail("Should have raised a ProgrammingError")
740        except sqlite.ProgrammingError:
741            pass
742        except:
743            self.fail("Should have raised a ProgrammingError")
744
745    def CheckClosedConCommit(self):
746        con = sqlite.connect(":memory:")
747        con.close()
748        try:
749            con.commit()
750            self.fail("Should have raised a ProgrammingError")
751        except sqlite.ProgrammingError:
752            pass
753        except:
754            self.fail("Should have raised a ProgrammingError")
755
756    def CheckClosedConRollback(self):
757        con = sqlite.connect(":memory:")
758        con.close()
759        try:
760            con.rollback()
761            self.fail("Should have raised a ProgrammingError")
762        except sqlite.ProgrammingError:
763            pass
764        except:
765            self.fail("Should have raised a ProgrammingError")
766
767    def CheckClosedCurExecute(self):
768        con = sqlite.connect(":memory:")
769        cur = con.cursor()
770        con.close()
771        try:
772            cur.execute("select 4")
773            self.fail("Should have raised a ProgrammingError")
774        except sqlite.ProgrammingError:
775            pass
776        except:
777            self.fail("Should have raised a ProgrammingError")
778
779    def CheckClosedCreateFunction(self):
780        con = sqlite.connect(":memory:")
781        con.close()
782        def f(x): return 17
783        try:
784            con.create_function("foo", 1, f)
785            self.fail("Should have raised a ProgrammingError")
786        except sqlite.ProgrammingError:
787            pass
788        except:
789            self.fail("Should have raised a ProgrammingError")
790
791    def CheckClosedCreateAggregate(self):
792        con = sqlite.connect(":memory:")
793        con.close()
794        class Agg:
795            def __init__(self):
796                pass
797            def step(self, x):
798                pass
799            def finalize(self):
800                return 17
801        try:
802            con.create_aggregate("foo", 1, Agg)
803            self.fail("Should have raised a ProgrammingError")
804        except sqlite.ProgrammingError:
805            pass
806        except:
807            self.fail("Should have raised a ProgrammingError")
808
809    def CheckClosedSetAuthorizer(self):
810        con = sqlite.connect(":memory:")
811        con.close()
812        def authorizer(*args):
813            return sqlite.DENY
814        try:
815            con.set_authorizer(authorizer)
816            self.fail("Should have raised a ProgrammingError")
817        except sqlite.ProgrammingError:
818            pass
819        except:
820            self.fail("Should have raised a ProgrammingError")
821
822    def CheckClosedSetProgressCallback(self):
823        con = sqlite.connect(":memory:")
824        con.close()
825        def progress(): pass
826        try:
827            con.set_progress_handler(progress, 100)
828            self.fail("Should have raised a ProgrammingError")
829        except sqlite.ProgrammingError:
830            pass
831        except:
832            self.fail("Should have raised a ProgrammingError")
833
834    def CheckClosedCall(self):
835        con = sqlite.connect(":memory:")
836        con.close()
837        try:
838            con()
839            self.fail("Should have raised a ProgrammingError")
840        except sqlite.ProgrammingError:
841            pass
842        except:
843            self.fail("Should have raised a ProgrammingError")
844
845class ClosedCurTests(unittest.TestCase):
846    def setUp(self):
847        pass
848
849    def tearDown(self):
850        pass
851
852    def CheckClosed(self):
853        con = sqlite.connect(":memory:")
854        cur = con.cursor()
855        cur.close()
856
857        for method_name in ("execute", "executemany", "executescript", "fetchall", "fetchmany", "fetchone"):
858            if method_name in ("execute", "executescript"):
859                params = ("select 4 union select 5",)
860            elif method_name == "executemany":
861                params = ("insert into foo(bar) values (?)", [(3,), (4,)])
862            else:
863                params = []
864
865            try:
866                method = getattr(cur, method_name)
867
868                method(*params)
869                self.fail("Should have raised a ProgrammingError: method " + method_name)
870            except sqlite.ProgrammingError:
871                pass
872            except:
873                self.fail("Should have raised a ProgrammingError: " + method_name)
874
875def suite():
876    module_suite = unittest.makeSuite(ModuleTests, "Check")
877    connection_suite = unittest.makeSuite(ConnectionTests, "Check")
878    cursor_suite = unittest.makeSuite(CursorTests, "Check")
879    thread_suite = unittest.makeSuite(ThreadTests, "Check")
880    constructor_suite = unittest.makeSuite(ConstructorTests, "Check")
881    ext_suite = unittest.makeSuite(ExtensionTests, "Check")
882    closed_con_suite = unittest.makeSuite(ClosedConTests, "Check")
883    closed_cur_suite = unittest.makeSuite(ClosedCurTests, "Check")
884    return unittest.TestSuite((module_suite, connection_suite, cursor_suite, thread_suite, constructor_suite, ext_suite, closed_con_suite, closed_cur_suite))
885
886def test():
887    runner = unittest.TextTestRunner()
888    runner.run(suite())
889
890if __name__ == "__main__":
891    test()
892