1#-*- coding: ISO-8859-1 -*-
2# pysqlite2/test/transactions.py: tests transactions
3#
4# Copyright (C) 2005-2007 Gerhard H�ring <gh@ghaering.de>
5#
6# This file is part of pysqlite.
7#
8# This software is provided 'as-is', without any express or implied
9# warranty.  In no event will the authors be held liable for any damages
10# arising from the use of this software.
11#
12# Permission is granted to anyone to use this software for any purpose,
13# including commercial applications, and to alter it and redistribute it
14# freely, subject to the following restrictions:
15#
16# 1. The origin of this software must not be misrepresented; you must not
17#    claim that you wrote the original software. If you use this software
18#    in a product, an acknowledgment in the product documentation would be
19#    appreciated but is not required.
20# 2. Altered source versions must be plainly marked as such, and must not be
21#    misrepresented as being the original software.
22# 3. This notice may not be removed or altered from any source distribution.
23
24import sys
25import os, unittest
26import sqlite3 as sqlite
27
28def get_db_path():
29    return "sqlite_testdb"
30
31class TransactionTests(unittest.TestCase):
32    def setUp(self):
33        try:
34            os.remove(get_db_path())
35        except OSError:
36            pass
37
38        self.con1 = sqlite.connect(get_db_path(), timeout=0.1)
39        self.cur1 = self.con1.cursor()
40
41        self.con2 = sqlite.connect(get_db_path(), timeout=0.1)
42        self.cur2 = self.con2.cursor()
43
44    def tearDown(self):
45        self.cur1.close()
46        self.con1.close()
47
48        self.cur2.close()
49        self.con2.close()
50
51        try:
52            os.unlink(get_db_path())
53        except OSError:
54            pass
55
56    def CheckDMLdoesAutoCommitBefore(self):
57        self.cur1.execute("create table test(i)")
58        self.cur1.execute("insert into test(i) values (5)")
59        self.cur1.execute("create table test2(j)")
60        self.cur2.execute("select i from test")
61        res = self.cur2.fetchall()
62        self.assertEqual(len(res), 1)
63
64    def CheckInsertStartsTransaction(self):
65        self.cur1.execute("create table test(i)")
66        self.cur1.execute("insert into test(i) values (5)")
67        self.cur2.execute("select i from test")
68        res = self.cur2.fetchall()
69        self.assertEqual(len(res), 0)
70
71    def CheckUpdateStartsTransaction(self):
72        self.cur1.execute("create table test(i)")
73        self.cur1.execute("insert into test(i) values (5)")
74        self.con1.commit()
75        self.cur1.execute("update test set i=6")
76        self.cur2.execute("select i from test")
77        res = self.cur2.fetchone()[0]
78        self.assertEqual(res, 5)
79
80    def CheckDeleteStartsTransaction(self):
81        self.cur1.execute("create table test(i)")
82        self.cur1.execute("insert into test(i) values (5)")
83        self.con1.commit()
84        self.cur1.execute("delete from test")
85        self.cur2.execute("select i from test")
86        res = self.cur2.fetchall()
87        self.assertEqual(len(res), 1)
88
89    def CheckReplaceStartsTransaction(self):
90        self.cur1.execute("create table test(i)")
91        self.cur1.execute("insert into test(i) values (5)")
92        self.con1.commit()
93        self.cur1.execute("replace into test(i) values (6)")
94        self.cur2.execute("select i from test")
95        res = self.cur2.fetchall()
96        self.assertEqual(len(res), 1)
97        self.assertEqual(res[0][0], 5)
98
99    def CheckToggleAutoCommit(self):
100        self.cur1.execute("create table test(i)")
101        self.cur1.execute("insert into test(i) values (5)")
102        self.con1.isolation_level = None
103        self.assertEqual(self.con1.isolation_level, None)
104        self.cur2.execute("select i from test")
105        res = self.cur2.fetchall()
106        self.assertEqual(len(res), 1)
107
108        self.con1.isolation_level = "DEFERRED"
109        self.assertEqual(self.con1.isolation_level , "DEFERRED")
110        self.cur1.execute("insert into test(i) values (5)")
111        self.cur2.execute("select i from test")
112        res = self.cur2.fetchall()
113        self.assertEqual(len(res), 1)
114
115    def CheckRaiseTimeout(self):
116        if sqlite.sqlite_version_info < (3, 2, 2):
117            # This will fail (hang) on earlier versions of sqlite.
118            # Determine exact version it was fixed. 3.2.1 hangs.
119            return
120        self.cur1.execute("create table test(i)")
121        self.cur1.execute("insert into test(i) values (5)")
122        try:
123            self.cur2.execute("insert into test(i) values (5)")
124            self.fail("should have raised an OperationalError")
125        except sqlite.OperationalError:
126            pass
127        except:
128            self.fail("should have raised an OperationalError")
129
130    def CheckLocking(self):
131        """
132        This tests the improved concurrency with pysqlite 2.3.4. You needed
133        to roll back con2 before you could commit con1.
134        """
135        if sqlite.sqlite_version_info < (3, 2, 2):
136            # This will fail (hang) on earlier versions of sqlite.
137            # Determine exact version it was fixed. 3.2.1 hangs.
138            return
139        self.cur1.execute("create table test(i)")
140        self.cur1.execute("insert into test(i) values (5)")
141        try:
142            self.cur2.execute("insert into test(i) values (5)")
143            self.fail("should have raised an OperationalError")
144        except sqlite.OperationalError:
145            pass
146        except:
147            self.fail("should have raised an OperationalError")
148        # NO self.con2.rollback() HERE!!!
149        self.con1.commit()
150
151    def CheckRollbackCursorConsistency(self):
152        """
153        Checks if cursors on the connection are set into a "reset" state
154        when a rollback is done on the connection.
155        """
156        con = sqlite.connect(":memory:")
157        cur = con.cursor()
158        cur.execute("create table test(x)")
159        cur.execute("insert into test(x) values (5)")
160        cur.execute("select 1 union select 2 union select 3")
161
162        con.rollback()
163        try:
164            cur.fetchall()
165            self.fail("InterfaceError should have been raised")
166        except sqlite.InterfaceError, e:
167            pass
168        except:
169            self.fail("InterfaceError should have been raised")
170
171class SpecialCommandTests(unittest.TestCase):
172    def setUp(self):
173        self.con = sqlite.connect(":memory:")
174        self.cur = self.con.cursor()
175
176    def CheckVacuum(self):
177        self.cur.execute("create table test(i)")
178        self.cur.execute("insert into test(i) values (5)")
179        self.cur.execute("vacuum")
180
181    def CheckDropTable(self):
182        self.cur.execute("create table test(i)")
183        self.cur.execute("insert into test(i) values (5)")
184        self.cur.execute("drop table test")
185
186    def CheckPragma(self):
187        self.cur.execute("create table test(i)")
188        self.cur.execute("insert into test(i) values (5)")
189        self.cur.execute("pragma count_changes=1")
190
191    def tearDown(self):
192        self.cur.close()
193        self.con.close()
194
195def suite():
196    default_suite = unittest.makeSuite(TransactionTests, "Check")
197    special_command_suite = unittest.makeSuite(SpecialCommandTests, "Check")
198    return unittest.TestSuite((default_suite, special_command_suite))
199
200def test():
201    runner = unittest.TextTestRunner()
202    runner.run(suite())
203
204if __name__ == "__main__":
205    test()
206