1"""TestCases for exercising a Recno DB. 2""" 3 4import os, sys 5import errno 6from pprint import pprint 7import string 8import unittest 9 10from test_all import db, test_support, verbose, get_new_environment_path, get_new_database_path 11 12 13#---------------------------------------------------------------------- 14 15class SimpleRecnoTestCase(unittest.TestCase): 16 if (sys.version_info < (2, 7)) or ((sys.version_info >= (3, 0)) and 17 (sys.version_info < (3, 2))) : 18 def assertIsInstance(self, obj, datatype, msg=None) : 19 return self.assertEqual(type(obj), datatype, msg=msg) 20 def assertGreaterEqual(self, a, b, msg=None) : 21 return self.assertGreaterEqual(a, b, msg=msg) 22 23 24 def setUp(self): 25 self.filename = get_new_database_path() 26 self.homeDir = None 27 28 def tearDown(self): 29 test_support.unlink(self.filename) 30 if self.homeDir: 31 test_support.rmtree(self.homeDir) 32 33 def test01_basic(self): 34 d = db.DB() 35 36 get_returns_none = d.set_get_returns_none(2) 37 d.set_get_returns_none(get_returns_none) 38 39 d.open(self.filename, db.DB_RECNO, db.DB_CREATE) 40 41 for x in string.ascii_letters: 42 recno = d.append(x * 60) 43 self.assertIsInstance(recno, int) 44 self.assertGreaterEqual(recno, 1) 45 if verbose: 46 print recno, 47 48 if verbose: print 49 50 stat = d.stat() 51 if verbose: 52 pprint(stat) 53 54 for recno in range(1, len(d)+1): 55 data = d[recno] 56 if verbose: 57 print data 58 59 self.assertIsInstance(data, str) 60 self.assertEqual(data, d.get(recno)) 61 62 try: 63 data = d[0] # This should raise a KeyError!?!?! 64 except db.DBInvalidArgError, val: 65 if sys.version_info < (2, 6) : 66 self.assertEqual(val[0], db.EINVAL) 67 else : 68 self.assertEqual(val.args[0], db.EINVAL) 69 if verbose: print val 70 else: 71 self.fail("expected exception") 72 73 # test that has_key raises DB exceptions (fixed in pybsddb 4.3.2) 74 try: 75 d.has_key(0) 76 except db.DBError, val: 77 pass 78 else: 79 self.fail("has_key did not raise a proper exception") 80 81 try: 82 data = d[100] 83 except KeyError: 84 pass 85 else: 86 self.fail("expected exception") 87 88 try: 89 data = d.get(100) 90 except db.DBNotFoundError, val: 91 if get_returns_none: 92 self.fail("unexpected exception") 93 else: 94 self.assertEqual(data, None) 95 96 keys = d.keys() 97 if verbose: 98 print keys 99 self.assertIsInstance(keys, list) 100 self.assertIsInstance(keys[0], int) 101 self.assertEqual(len(keys), len(d)) 102 103 items = d.items() 104 if verbose: 105 pprint(items) 106 self.assertIsInstance(items, list) 107 self.assertIsInstance(items[0], tuple) 108 self.assertEqual(len(items[0]), 2) 109 self.assertIsInstance(items[0][0], int) 110 self.assertIsInstance(items[0][1], str) 111 self.assertEqual(len(items), len(d)) 112 113 self.assertTrue(d.has_key(25)) 114 115 del d[25] 116 self.assertFalse(d.has_key(25)) 117 118 d.delete(13) 119 self.assertFalse(d.has_key(13)) 120 121 data = d.get_both(26, "z" * 60) 122 self.assertEqual(data, "z" * 60, 'was %r' % data) 123 if verbose: 124 print data 125 126 fd = d.fd() 127 if verbose: 128 print fd 129 130 c = d.cursor() 131 rec = c.first() 132 while rec: 133 if verbose: 134 print rec 135 rec = c.next() 136 137 c.set(50) 138 rec = c.current() 139 if verbose: 140 print rec 141 142 c.put(-1, "a replacement record", db.DB_CURRENT) 143 144 c.set(50) 145 rec = c.current() 146 self.assertEqual(rec, (50, "a replacement record")) 147 if verbose: 148 print rec 149 150 rec = c.set_range(30) 151 if verbose: 152 print rec 153 154 # test that non-existent key lookups work (and that 155 # DBC_set_range doesn't have a memleak under valgrind) 156 rec = c.set_range(999999) 157 self.assertEqual(rec, None) 158 if verbose: 159 print rec 160 161 c.close() 162 d.close() 163 164 d = db.DB() 165 d.open(self.filename) 166 c = d.cursor() 167 168 # put a record beyond the consecutive end of the recno's 169 d[100] = "way out there" 170 self.assertEqual(d[100], "way out there") 171 172 try: 173 data = d[99] 174 except KeyError: 175 pass 176 else: 177 self.fail("expected exception") 178 179 try: 180 d.get(99) 181 except db.DBKeyEmptyError, val: 182 if get_returns_none: 183 self.fail("unexpected DBKeyEmptyError exception") 184 else: 185 if sys.version_info < (2, 6) : 186 self.assertEqual(val[0], db.DB_KEYEMPTY) 187 else : 188 self.assertEqual(val.args[0], db.DB_KEYEMPTY) 189 if verbose: print val 190 else: 191 if not get_returns_none: 192 self.fail("expected exception") 193 194 rec = c.set(40) 195 while rec: 196 if verbose: 197 print rec 198 rec = c.next() 199 200 c.close() 201 d.close() 202 203 def test02_WithSource(self): 204 """ 205 A Recno file that is given a "backing source file" is essentially a 206 simple ASCII file. Normally each record is delimited by \n and so is 207 just a line in the file, but you can set a different record delimiter 208 if needed. 209 """ 210 homeDir = get_new_environment_path() 211 self.homeDir = homeDir 212 source = os.path.join(homeDir, 'test_recno.txt') 213 if not os.path.isdir(homeDir): 214 os.mkdir(homeDir) 215 f = open(source, 'w') # create the file 216 f.close() 217 218 d = db.DB() 219 # This is the default value, just checking if both int 220 d.set_re_delim(0x0A) 221 d.set_re_delim('\n') # and char can be used... 222 d.set_re_source(source) 223 d.open(self.filename, db.DB_RECNO, db.DB_CREATE) 224 225 data = "The quick brown fox jumped over the lazy dog".split() 226 for datum in data: 227 d.append(datum) 228 d.sync() 229 d.close() 230 231 # get the text from the backing source 232 f = open(source, 'r') 233 text = f.read() 234 f.close() 235 text = text.strip() 236 if verbose: 237 print text 238 print data 239 print text.split('\n') 240 241 self.assertEqual(text.split('\n'), data) 242 243 # open as a DB again 244 d = db.DB() 245 d.set_re_source(source) 246 d.open(self.filename, db.DB_RECNO) 247 248 d[3] = 'reddish-brown' 249 d[8] = 'comatose' 250 251 d.sync() 252 d.close() 253 254 f = open(source, 'r') 255 text = f.read() 256 f.close() 257 text = text.strip() 258 if verbose: 259 print text 260 print text.split('\n') 261 262 self.assertEqual(text.split('\n'), 263 "The quick reddish-brown fox jumped over the comatose dog".split()) 264 265 def test03_FixedLength(self): 266 d = db.DB() 267 d.set_re_len(40) # fixed length records, 40 bytes long 268 d.set_re_pad('-') # sets the pad character... 269 d.set_re_pad(45) # ...test both int and char 270 d.open(self.filename, db.DB_RECNO, db.DB_CREATE) 271 272 for x in string.ascii_letters: 273 d.append(x * 35) # These will be padded 274 275 d.append('.' * 40) # this one will be exact 276 277 try: # this one will fail 278 d.append('bad' * 20) 279 except db.DBInvalidArgError, val: 280 if sys.version_info < (2, 6) : 281 self.assertEqual(val[0], db.EINVAL) 282 else : 283 self.assertEqual(val.args[0], db.EINVAL) 284 if verbose: print val 285 else: 286 self.fail("expected exception") 287 288 c = d.cursor() 289 rec = c.first() 290 while rec: 291 if verbose: 292 print rec 293 rec = c.next() 294 295 c.close() 296 d.close() 297 298 def test04_get_size_empty(self) : 299 d = db.DB() 300 d.open(self.filename, dbtype=db.DB_RECNO, flags=db.DB_CREATE) 301 302 row_id = d.append(' ') 303 self.assertEqual(1, d.get_size(key=row_id)) 304 row_id = d.append('') 305 self.assertEqual(0, d.get_size(key=row_id)) 306 307 308 309 310 311#---------------------------------------------------------------------- 312 313 314def test_suite(): 315 return unittest.makeSuite(SimpleRecnoTestCase) 316 317 318if __name__ == '__main__': 319 unittest.main(defaultTest='test_suite') 320