1"""TestCases for exercising a Recno DB.
2"""
3
4import os, sys
5import errno
6from pprint import pprint
7import string
8import unittest
9
10from test_all import db, test_support, verbose, get_new_environment_path, get_new_database_path
11
12
13#----------------------------------------------------------------------
14
15class SimpleRecnoTestCase(unittest.TestCase):
16    if (sys.version_info < (2, 7)) or ((sys.version_info >= (3, 0)) and
17            (sys.version_info < (3, 2))) :
18        def assertIsInstance(self, obj, datatype, msg=None) :
19            return self.assertEqual(type(obj), datatype, msg=msg)
20        def assertGreaterEqual(self, a, b, msg=None) :
21            return self.assertGreaterEqual(a, b, msg=msg)
22
23
24    def setUp(self):
25        self.filename = get_new_database_path()
26        self.homeDir = None
27
28    def tearDown(self):
29        test_support.unlink(self.filename)
30        if self.homeDir:
31            test_support.rmtree(self.homeDir)
32
33    def test01_basic(self):
34        d = db.DB()
35
36        get_returns_none = d.set_get_returns_none(2)
37        d.set_get_returns_none(get_returns_none)
38
39        d.open(self.filename, db.DB_RECNO, db.DB_CREATE)
40
41        for x in string.ascii_letters:
42            recno = d.append(x * 60)
43            self.assertIsInstance(recno, int)
44            self.assertGreaterEqual(recno, 1)
45            if verbose:
46                print recno,
47
48        if verbose: print
49
50        stat = d.stat()
51        if verbose:
52            pprint(stat)
53
54        for recno in range(1, len(d)+1):
55            data = d[recno]
56            if verbose:
57                print data
58
59            self.assertIsInstance(data, str)
60            self.assertEqual(data, d.get(recno))
61
62        try:
63            data = d[0]  # This should raise a KeyError!?!?!
64        except db.DBInvalidArgError, val:
65            if sys.version_info < (2, 6) :
66                self.assertEqual(val[0], db.EINVAL)
67            else :
68                self.assertEqual(val.args[0], db.EINVAL)
69            if verbose: print val
70        else:
71            self.fail("expected exception")
72
73        # test that has_key raises DB exceptions (fixed in pybsddb 4.3.2)
74        try:
75            d.has_key(0)
76        except db.DBError, val:
77            pass
78        else:
79            self.fail("has_key did not raise a proper exception")
80
81        try:
82            data = d[100]
83        except KeyError:
84            pass
85        else:
86            self.fail("expected exception")
87
88        try:
89            data = d.get(100)
90        except db.DBNotFoundError, val:
91            if get_returns_none:
92                self.fail("unexpected exception")
93        else:
94            self.assertEqual(data, None)
95
96        keys = d.keys()
97        if verbose:
98            print keys
99        self.assertIsInstance(keys, list)
100        self.assertIsInstance(keys[0], int)
101        self.assertEqual(len(keys), len(d))
102
103        items = d.items()
104        if verbose:
105            pprint(items)
106        self.assertIsInstance(items, list)
107        self.assertIsInstance(items[0], tuple)
108        self.assertEqual(len(items[0]), 2)
109        self.assertIsInstance(items[0][0], int)
110        self.assertIsInstance(items[0][1], str)
111        self.assertEqual(len(items), len(d))
112
113        self.assertTrue(d.has_key(25))
114
115        del d[25]
116        self.assertFalse(d.has_key(25))
117
118        d.delete(13)
119        self.assertFalse(d.has_key(13))
120
121        data = d.get_both(26, "z" * 60)
122        self.assertEqual(data, "z" * 60, 'was %r' % data)
123        if verbose:
124            print data
125
126        fd = d.fd()
127        if verbose:
128            print fd
129
130        c = d.cursor()
131        rec = c.first()
132        while rec:
133            if verbose:
134                print rec
135            rec = c.next()
136
137        c.set(50)
138        rec = c.current()
139        if verbose:
140            print rec
141
142        c.put(-1, "a replacement record", db.DB_CURRENT)
143
144        c.set(50)
145        rec = c.current()
146        self.assertEqual(rec, (50, "a replacement record"))
147        if verbose:
148            print rec
149
150        rec = c.set_range(30)
151        if verbose:
152            print rec
153
154        # test that non-existent key lookups work (and that
155        # DBC_set_range doesn't have a memleak under valgrind)
156        rec = c.set_range(999999)
157        self.assertEqual(rec, None)
158        if verbose:
159            print rec
160
161        c.close()
162        d.close()
163
164        d = db.DB()
165        d.open(self.filename)
166        c = d.cursor()
167
168        # put a record beyond the consecutive end of the recno's
169        d[100] = "way out there"
170        self.assertEqual(d[100], "way out there")
171
172        try:
173            data = d[99]
174        except KeyError:
175            pass
176        else:
177            self.fail("expected exception")
178
179        try:
180            d.get(99)
181        except db.DBKeyEmptyError, val:
182            if get_returns_none:
183                self.fail("unexpected DBKeyEmptyError exception")
184            else:
185                if sys.version_info < (2, 6) :
186                    self.assertEqual(val[0], db.DB_KEYEMPTY)
187                else :
188                    self.assertEqual(val.args[0], db.DB_KEYEMPTY)
189                if verbose: print val
190        else:
191            if not get_returns_none:
192                self.fail("expected exception")
193
194        rec = c.set(40)
195        while rec:
196            if verbose:
197                print rec
198            rec = c.next()
199
200        c.close()
201        d.close()
202
203    def test02_WithSource(self):
204        """
205        A Recno file that is given a "backing source file" is essentially a
206        simple ASCII file.  Normally each record is delimited by \n and so is
207        just a line in the file, but you can set a different record delimiter
208        if needed.
209        """
210        homeDir = get_new_environment_path()
211        self.homeDir = homeDir
212        source = os.path.join(homeDir, 'test_recno.txt')
213        if not os.path.isdir(homeDir):
214            os.mkdir(homeDir)
215        f = open(source, 'w') # create the file
216        f.close()
217
218        d = db.DB()
219        # This is the default value, just checking if both int
220        d.set_re_delim(0x0A)
221        d.set_re_delim('\n')  # and char can be used...
222        d.set_re_source(source)
223        d.open(self.filename, db.DB_RECNO, db.DB_CREATE)
224
225        data = "The quick brown fox jumped over the lazy dog".split()
226        for datum in data:
227            d.append(datum)
228        d.sync()
229        d.close()
230
231        # get the text from the backing source
232        f = open(source, 'r')
233        text = f.read()
234        f.close()
235        text = text.strip()
236        if verbose:
237            print text
238            print data
239            print text.split('\n')
240
241        self.assertEqual(text.split('\n'), data)
242
243        # open as a DB again
244        d = db.DB()
245        d.set_re_source(source)
246        d.open(self.filename, db.DB_RECNO)
247
248        d[3] = 'reddish-brown'
249        d[8] = 'comatose'
250
251        d.sync()
252        d.close()
253
254        f = open(source, 'r')
255        text = f.read()
256        f.close()
257        text = text.strip()
258        if verbose:
259            print text
260            print text.split('\n')
261
262        self.assertEqual(text.split('\n'),
263           "The quick reddish-brown fox jumped over the comatose dog".split())
264
265    def test03_FixedLength(self):
266        d = db.DB()
267        d.set_re_len(40)  # fixed length records, 40 bytes long
268        d.set_re_pad('-') # sets the pad character...
269        d.set_re_pad(45)  # ...test both int and char
270        d.open(self.filename, db.DB_RECNO, db.DB_CREATE)
271
272        for x in string.ascii_letters:
273            d.append(x * 35)    # These will be padded
274
275        d.append('.' * 40)      # this one will be exact
276
277        try:                    # this one will fail
278            d.append('bad' * 20)
279        except db.DBInvalidArgError, val:
280            if sys.version_info < (2, 6) :
281                self.assertEqual(val[0], db.EINVAL)
282            else :
283                self.assertEqual(val.args[0], db.EINVAL)
284            if verbose: print val
285        else:
286            self.fail("expected exception")
287
288        c = d.cursor()
289        rec = c.first()
290        while rec:
291            if verbose:
292                print rec
293            rec = c.next()
294
295        c.close()
296        d.close()
297
298    def test04_get_size_empty(self) :
299        d = db.DB()
300        d.open(self.filename, dbtype=db.DB_RECNO, flags=db.DB_CREATE)
301
302        row_id = d.append(' ')
303        self.assertEqual(1, d.get_size(key=row_id))
304        row_id = d.append('')
305        self.assertEqual(0, d.get_size(key=row_id))
306
307
308
309
310
311#----------------------------------------------------------------------
312
313
314def test_suite():
315    return unittest.makeSuite(SimpleRecnoTestCase)
316
317
318if __name__ == '__main__':
319    unittest.main(defaultTest='test_suite')
320