1import sqlite3 as sqlite
2import unittest
3
4
5@unittest.skipIf(sqlite.sqlite_version_info < (3, 6, 11), "Backup API not supported")
6class BackupTests(unittest.TestCase):
7    def setUp(self):
8        cx = self.cx = sqlite.connect(":memory:")
9        cx.execute('CREATE TABLE foo (key INTEGER)')
10        cx.executemany('INSERT INTO foo (key) VALUES (?)', [(3,), (4,)])
11        cx.commit()
12
13    def tearDown(self):
14        self.cx.close()
15
16    def verify_backup(self, bckcx):
17        result = bckcx.execute("SELECT key FROM foo ORDER BY key").fetchall()
18        self.assertEqual(result[0][0], 3)
19        self.assertEqual(result[1][0], 4)
20
21    def test_bad_target_none(self):
22        with self.assertRaises(TypeError):
23            self.cx.backup(None)
24
25    def test_bad_target_filename(self):
26        with self.assertRaises(TypeError):
27            self.cx.backup('some_file_name.db')
28
29    def test_bad_target_same_connection(self):
30        with self.assertRaises(ValueError):
31            self.cx.backup(self.cx)
32
33    def test_bad_target_closed_connection(self):
34        bck = sqlite.connect(':memory:')
35        bck.close()
36        with self.assertRaises(sqlite.ProgrammingError):
37            self.cx.backup(bck)
38
39    def test_bad_target_in_transaction(self):
40        bck = sqlite.connect(':memory:')
41        bck.execute('CREATE TABLE bar (key INTEGER)')
42        bck.executemany('INSERT INTO bar (key) VALUES (?)', [(3,), (4,)])
43        with self.assertRaises(sqlite.OperationalError) as cm:
44            self.cx.backup(bck)
45        if sqlite.sqlite_version_info < (3, 8, 8):
46            self.assertEqual(str(cm.exception), 'target is in transaction')
47
48    def test_keyword_only_args(self):
49        with self.assertRaises(TypeError):
50            with sqlite.connect(':memory:') as bck:
51                self.cx.backup(bck, 1)
52
53    def test_simple(self):
54        with sqlite.connect(':memory:') as bck:
55            self.cx.backup(bck)
56            self.verify_backup(bck)
57
58    def test_progress(self):
59        journal = []
60
61        def progress(status, remaining, total):
62            journal.append(status)
63
64        with sqlite.connect(':memory:') as bck:
65            self.cx.backup(bck, pages=1, progress=progress)
66            self.verify_backup(bck)
67
68        self.assertEqual(len(journal), 2)
69        self.assertEqual(journal[0], sqlite.SQLITE_OK)
70        self.assertEqual(journal[1], sqlite.SQLITE_DONE)
71
72    def test_progress_all_pages_at_once_1(self):
73        journal = []
74
75        def progress(status, remaining, total):
76            journal.append(remaining)
77
78        with sqlite.connect(':memory:') as bck:
79            self.cx.backup(bck, progress=progress)
80            self.verify_backup(bck)
81
82        self.assertEqual(len(journal), 1)
83        self.assertEqual(journal[0], 0)
84
85    def test_progress_all_pages_at_once_2(self):
86        journal = []
87
88        def progress(status, remaining, total):
89            journal.append(remaining)
90
91        with sqlite.connect(':memory:') as bck:
92            self.cx.backup(bck, pages=-1, progress=progress)
93            self.verify_backup(bck)
94
95        self.assertEqual(len(journal), 1)
96        self.assertEqual(journal[0], 0)
97
98    def test_non_callable_progress(self):
99        with self.assertRaises(TypeError) as cm:
100            with sqlite.connect(':memory:') as bck:
101                self.cx.backup(bck, pages=1, progress='bar')
102        self.assertEqual(str(cm.exception), 'progress argument must be a callable')
103
104    def test_modifying_progress(self):
105        journal = []
106
107        def progress(status, remaining, total):
108            if not journal:
109                self.cx.execute('INSERT INTO foo (key) VALUES (?)', (remaining+1000,))
110                self.cx.commit()
111            journal.append(remaining)
112
113        with sqlite.connect(':memory:') as bck:
114            self.cx.backup(bck, pages=1, progress=progress)
115            self.verify_backup(bck)
116
117            result = bck.execute("SELECT key FROM foo"
118                                 " WHERE key >= 1000"
119                                 " ORDER BY key").fetchall()
120            self.assertEqual(result[0][0], 1001)
121
122        self.assertEqual(len(journal), 3)
123        self.assertEqual(journal[0], 1)
124        self.assertEqual(journal[1], 1)
125        self.assertEqual(journal[2], 0)
126
127    def test_failing_progress(self):
128        def progress(status, remaining, total):
129            raise SystemError('nearly out of space')
130
131        with self.assertRaises(SystemError) as err:
132            with sqlite.connect(':memory:') as bck:
133                self.cx.backup(bck, progress=progress)
134        self.assertEqual(str(err.exception), 'nearly out of space')
135
136    def test_database_source_name(self):
137        with sqlite.connect(':memory:') as bck:
138            self.cx.backup(bck, name='main')
139        with sqlite.connect(':memory:') as bck:
140            self.cx.backup(bck, name='temp')
141        with self.assertRaises(sqlite.OperationalError) as cm:
142            with sqlite.connect(':memory:') as bck:
143                self.cx.backup(bck, name='non-existing')
144        self.assertIn(
145            str(cm.exception),
146            ['SQL logic error', 'SQL logic error or missing database']
147        )
148
149        self.cx.execute("ATTACH DATABASE ':memory:' AS attached_db")
150        self.cx.execute('CREATE TABLE attached_db.foo (key INTEGER)')
151        self.cx.executemany('INSERT INTO attached_db.foo (key) VALUES (?)', [(3,), (4,)])
152        self.cx.commit()
153        with sqlite.connect(':memory:') as bck:
154            self.cx.backup(bck, name='attached_db')
155            self.verify_backup(bck)
156
157
158def suite():
159    return unittest.makeSuite(BackupTests)
160
161if __name__ == "__main__":
162    unittest.main()
163