1from test import support
2gdbm = support.import_module("dbm.gnu") #skip if not supported
3import unittest
4import os
5from test.support import TESTFN, TESTFN_NONASCII, unlink
6
7
8filename = TESTFN
9
10class TestGdbm(unittest.TestCase):
11    def setUp(self):
12        self.g = None
13
14    def tearDown(self):
15        if self.g is not None:
16            self.g.close()
17        unlink(filename)
18
19    def test_key_methods(self):
20        self.g = gdbm.open(filename, 'c')
21        self.assertEqual(self.g.keys(), [])
22        self.g['a'] = 'b'
23        self.g['12345678910'] = '019237410982340912840198242'
24        self.g[b'bytes'] = b'data'
25        key_set = set(self.g.keys())
26        self.assertEqual(key_set, set([b'a', b'bytes', b'12345678910']))
27        self.assertIn('a', self.g)
28        self.assertIn(b'a', self.g)
29        self.assertEqual(self.g[b'bytes'], b'data')
30        key = self.g.firstkey()
31        while key:
32            self.assertIn(key, key_set)
33            key_set.remove(key)
34            key = self.g.nextkey(key)
35        # get() and setdefault() work as in the dict interface
36        self.assertEqual(self.g.get(b'a'), b'b')
37        self.assertIsNone(self.g.get(b'xxx'))
38        self.assertEqual(self.g.get(b'xxx', b'foo'), b'foo')
39        with self.assertRaises(KeyError):
40            self.g['xxx']
41        self.assertEqual(self.g.setdefault(b'xxx', b'foo'), b'foo')
42        self.assertEqual(self.g[b'xxx'], b'foo')
43
44    def test_error_conditions(self):
45        # Try to open a non-existent database.
46        unlink(filename)
47        self.assertRaises(gdbm.error, gdbm.open, filename, 'r')
48        # Try to access a closed database.
49        self.g = gdbm.open(filename, 'c')
50        self.g.close()
51        self.assertRaises(gdbm.error, lambda: self.g['a'])
52        # try pass an invalid open flag
53        self.assertRaises(gdbm.error, lambda: gdbm.open(filename, 'rx').close())
54
55    def test_flags(self):
56        # Test the flag parameter open() by trying all supported flag modes.
57        all = set(gdbm.open_flags)
58        # Test standard flags (presumably "crwn").
59        modes = all - set('fsu')
60        for mode in sorted(modes):  # put "c" mode first
61            self.g = gdbm.open(filename, mode)
62            self.g.close()
63
64        # Test additional flags (presumably "fsu").
65        flags = all - set('crwn')
66        for mode in modes:
67            for flag in flags:
68                self.g = gdbm.open(filename, mode + flag)
69                self.g.close()
70
71    def test_reorganize(self):
72        self.g = gdbm.open(filename, 'c')
73        size0 = os.path.getsize(filename)
74
75        # bpo-33901: on macOS with gdbm 1.15, an empty database uses 16 MiB
76        # and adding an entry of 10,000 B has no effect on the file size.
77        # Add size0 bytes to make sure that the file size changes.
78        value_size = max(size0, 10000)
79        self.g['x'] = 'x' * value_size
80        size1 = os.path.getsize(filename)
81        self.assertGreater(size1, size0)
82
83        del self.g['x']
84        # 'size' is supposed to be the same even after deleting an entry.
85        self.assertEqual(os.path.getsize(filename), size1)
86
87        self.g.reorganize()
88        size2 = os.path.getsize(filename)
89        self.assertLess(size2, size1)
90        self.assertGreaterEqual(size2, size0)
91
92    def test_context_manager(self):
93        with gdbm.open(filename, 'c') as db:
94            db["gdbm context manager"] = "context manager"
95
96        with gdbm.open(filename, 'r') as db:
97            self.assertEqual(list(db.keys()), [b"gdbm context manager"])
98
99        with self.assertRaises(gdbm.error) as cm:
100            db.keys()
101        self.assertEqual(str(cm.exception),
102                         "GDBM object has already been closed")
103
104    def test_bytes(self):
105        with gdbm.open(filename, 'c') as db:
106            db[b'bytes key \xbd'] = b'bytes value \xbd'
107        with gdbm.open(filename, 'r') as db:
108            self.assertEqual(list(db.keys()), [b'bytes key \xbd'])
109            self.assertTrue(b'bytes key \xbd' in db)
110            self.assertEqual(db[b'bytes key \xbd'], b'bytes value \xbd')
111
112    def test_unicode(self):
113        with gdbm.open(filename, 'c') as db:
114            db['Unicode key \U0001f40d'] = 'Unicode value \U0001f40d'
115        with gdbm.open(filename, 'r') as db:
116            self.assertEqual(list(db.keys()), ['Unicode key \U0001f40d'.encode()])
117            self.assertTrue('Unicode key \U0001f40d'.encode() in db)
118            self.assertTrue('Unicode key \U0001f40d' in db)
119            self.assertEqual(db['Unicode key \U0001f40d'.encode()],
120                             'Unicode value \U0001f40d'.encode())
121            self.assertEqual(db['Unicode key \U0001f40d'],
122                             'Unicode value \U0001f40d'.encode())
123
124    @unittest.skipUnless(TESTFN_NONASCII,
125                         'requires OS support of non-ASCII encodings')
126    def test_nonascii_filename(self):
127        filename = TESTFN_NONASCII
128        self.addCleanup(unlink, filename)
129        with gdbm.open(filename, 'c') as db:
130            db[b'key'] = b'value'
131        self.assertTrue(os.path.exists(filename))
132        with gdbm.open(filename, 'r') as db:
133            self.assertEqual(list(db.keys()), [b'key'])
134            self.assertTrue(b'key' in db)
135            self.assertEqual(db[b'key'], b'value')
136
137
138if __name__ == '__main__':
139    unittest.main()
140