1#----------------------------------------------------------------------- 2# A test suite for the table interface built on bsddb.db 3#----------------------------------------------------------------------- 4# 5# Copyright (C) 2000, 2001 by Autonomous Zone Industries 6# Copyright (C) 2002 Gregory P. Smith 7# 8# March 20, 2000 9# 10# License: This is free software. You may use this software for any 11# purpose including modification/redistribution, so long as 12# this header remains intact and that you do not claim any 13# rights of ownership or authorship of this software. This 14# software has been tested, but no warranty is expressed or 15# implied. 16# 17# -- Gregory P. Smith <greg@krypto.org> 18# 19# $Id$ 20 21import os, re, sys 22 23if sys.version_info[0] < 3 : 24 try: 25 import cPickle 26 pickle = cPickle 27 except ImportError: 28 import pickle 29else : 30 import pickle 31 32import unittest 33from test_all import db, dbtables, test_support, verbose, \ 34 get_new_environment_path, get_new_database_path 35 36#---------------------------------------------------------------------- 37 38class TableDBTestCase(unittest.TestCase): 39 db_name = 'test-table.db' 40 41 def setUp(self): 42 import sys 43 if sys.version_info[0] >= 3 : 44 from test_all import do_proxy_db_py3k 45 self._flag_proxy_db_py3k = do_proxy_db_py3k(False) 46 47 self.testHomeDir = get_new_environment_path() 48 self.tdb = dbtables.bsdTableDB( 49 filename='tabletest.db', dbhome=self.testHomeDir, create=1) 50 51 def tearDown(self): 52 self.tdb.close() 53 import sys 54 if sys.version_info[0] >= 3 : 55 from test_all import do_proxy_db_py3k 56 do_proxy_db_py3k(self._flag_proxy_db_py3k) 57 test_support.rmtree(self.testHomeDir) 58 59 def test01(self): 60 tabname = "test01" 61 colname = 'cool numbers' 62 try: 63 self.tdb.Drop(tabname) 64 except dbtables.TableDBError: 65 pass 66 self.tdb.CreateTable(tabname, [colname]) 67 import sys 68 if sys.version_info[0] < 3 : 69 self.tdb.Insert(tabname, {colname: pickle.dumps(3.14159, 1)}) 70 else : 71 self.tdb.Insert(tabname, {colname: pickle.dumps(3.14159, 72 1).decode("iso8859-1")}) # 8 bits 73 74 if verbose: 75 self.tdb._db_print() 76 77 values = self.tdb.Select( 78 tabname, [colname], conditions={colname: None}) 79 80 import sys 81 if sys.version_info[0] < 3 : 82 colval = pickle.loads(values[0][colname]) 83 else : 84 colval = pickle.loads(bytes(values[0][colname], "iso8859-1")) 85 self.assertGreater(colval, 3.141) 86 self.assertLess(colval, 3.142) 87 88 89 def test02(self): 90 tabname = "test02" 91 col0 = 'coolness factor' 92 col1 = 'but can it fly?' 93 col2 = 'Species' 94 95 import sys 96 if sys.version_info[0] < 3 : 97 testinfo = [ 98 {col0: pickle.dumps(8, 1), col1: 'no', col2: 'Penguin'}, 99 {col0: pickle.dumps(-1, 1), col1: 'no', col2: 'Turkey'}, 100 {col0: pickle.dumps(9, 1), col1: 'yes', col2: 'SR-71A Blackbird'} 101 ] 102 else : 103 testinfo = [ 104 {col0: pickle.dumps(8, 1).decode("iso8859-1"), 105 col1: 'no', col2: 'Penguin'}, 106 {col0: pickle.dumps(-1, 1).decode("iso8859-1"), 107 col1: 'no', col2: 'Turkey'}, 108 {col0: pickle.dumps(9, 1).decode("iso8859-1"), 109 col1: 'yes', col2: 'SR-71A Blackbird'} 110 ] 111 112 try: 113 self.tdb.Drop(tabname) 114 except dbtables.TableDBError: 115 pass 116 self.tdb.CreateTable(tabname, [col0, col1, col2]) 117 for row in testinfo : 118 self.tdb.Insert(tabname, row) 119 120 import sys 121 if sys.version_info[0] < 3 : 122 values = self.tdb.Select(tabname, [col2], 123 conditions={col0: lambda x: pickle.loads(x) >= 8}) 124 else : 125 values = self.tdb.Select(tabname, [col2], 126 conditions={col0: lambda x: 127 pickle.loads(bytes(x, "iso8859-1")) >= 8}) 128 129 self.assertEqual(len(values), 2) 130 if values[0]['Species'] == 'Penguin' : 131 self.assertEqual(values[1]['Species'], 'SR-71A Blackbird') 132 elif values[0]['Species'] == 'SR-71A Blackbird' : 133 self.assertEqual(values[1]['Species'], 'Penguin') 134 else : 135 if verbose: 136 print "values= %r" % (values,) 137 raise RuntimeError("Wrong values returned!") 138 139 def test03(self): 140 tabname = "test03" 141 try: 142 self.tdb.Drop(tabname) 143 except dbtables.TableDBError: 144 pass 145 if verbose: 146 print '...before CreateTable...' 147 self.tdb._db_print() 148 self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e']) 149 if verbose: 150 print '...after CreateTable...' 151 self.tdb._db_print() 152 self.tdb.Drop(tabname) 153 if verbose: 154 print '...after Drop...' 155 self.tdb._db_print() 156 self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e']) 157 158 try: 159 self.tdb.Insert(tabname, 160 {'a': "", 161 'e': pickle.dumps([{4:5, 6:7}, 'foo'], 1), 162 'f': "Zero"}) 163 self.fail('Expected an exception') 164 except dbtables.TableDBError: 165 pass 166 167 try: 168 self.tdb.Select(tabname, [], conditions={'foo': '123'}) 169 self.fail('Expected an exception') 170 except dbtables.TableDBError: 171 pass 172 173 self.tdb.Insert(tabname, 174 {'a': '42', 175 'b': "bad", 176 'c': "meep", 177 'e': 'Fuzzy wuzzy was a bear'}) 178 self.tdb.Insert(tabname, 179 {'a': '581750', 180 'b': "good", 181 'd': "bla", 182 'c': "black", 183 'e': 'fuzzy was here'}) 184 self.tdb.Insert(tabname, 185 {'a': '800000', 186 'b': "good", 187 'd': "bla", 188 'c': "black", 189 'e': 'Fuzzy wuzzy is a bear'}) 190 191 if verbose: 192 self.tdb._db_print() 193 194 # this should return two rows 195 values = self.tdb.Select(tabname, ['b', 'a', 'd'], 196 conditions={'e': re.compile('wuzzy').search, 197 'a': re.compile('^[0-9]+$').match}) 198 self.assertEqual(len(values), 2) 199 200 # now lets delete one of them and try again 201 self.tdb.Delete(tabname, conditions={'b': dbtables.ExactCond('good')}) 202 values = self.tdb.Select( 203 tabname, ['a', 'd', 'b'], 204 conditions={'e': dbtables.PrefixCond('Fuzzy')}) 205 self.assertEqual(len(values), 1) 206 self.assertEqual(values[0]['d'], None) 207 208 values = self.tdb.Select(tabname, ['b'], 209 conditions={'c': lambda c: c == 'meep'}) 210 self.assertEqual(len(values), 1) 211 self.assertEqual(values[0]['b'], "bad") 212 213 214 def test04_MultiCondSelect(self): 215 tabname = "test04_MultiCondSelect" 216 try: 217 self.tdb.Drop(tabname) 218 except dbtables.TableDBError: 219 pass 220 self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e']) 221 222 try: 223 self.tdb.Insert(tabname, 224 {'a': "", 225 'e': pickle.dumps([{4:5, 6:7}, 'foo'], 1), 226 'f': "Zero"}) 227 self.fail('Expected an exception') 228 except dbtables.TableDBError: 229 pass 230 231 self.tdb.Insert(tabname, {'a': "A", 'b': "B", 'c': "C", 'd': "D", 232 'e': "E"}) 233 self.tdb.Insert(tabname, {'a': "-A", 'b': "-B", 'c': "-C", 'd': "-D", 234 'e': "-E"}) 235 self.tdb.Insert(tabname, {'a': "A-", 'b': "B-", 'c': "C-", 'd': "D-", 236 'e': "E-"}) 237 238 if verbose: 239 self.tdb._db_print() 240 241 # This select should return 0 rows. it is designed to test 242 # the bug identified and fixed in sourceforge bug # 590449 243 # (Big Thanks to "Rob Tillotson (n9mtb)" for tracking this down 244 # and supplying a fix!! This one caused many headaches to say 245 # the least...) 246 values = self.tdb.Select(tabname, ['b', 'a', 'd'], 247 conditions={'e': dbtables.ExactCond('E'), 248 'a': dbtables.ExactCond('A'), 249 'd': dbtables.PrefixCond('-') 250 } ) 251 self.assertEqual(len(values), 0, values) 252 253 254 def test_CreateOrExtend(self): 255 tabname = "test_CreateOrExtend" 256 257 self.tdb.CreateOrExtendTable( 258 tabname, ['name', 'taste', 'filling', 'alcohol content', 'price']) 259 try: 260 self.tdb.Insert(tabname, 261 {'taste': 'crap', 262 'filling': 'no', 263 'is it Guinness?': 'no'}) 264 self.fail("Insert should've failed due to bad column name") 265 except: 266 pass 267 self.tdb.CreateOrExtendTable(tabname, 268 ['name', 'taste', 'is it Guinness?']) 269 270 # these should both succeed as the table should contain the union of both sets of columns. 271 self.tdb.Insert(tabname, {'taste': 'crap', 'filling': 'no', 272 'is it Guinness?': 'no'}) 273 self.tdb.Insert(tabname, {'taste': 'great', 'filling': 'yes', 274 'is it Guinness?': 'yes', 275 'name': 'Guinness'}) 276 277 278 def test_CondObjs(self): 279 tabname = "test_CondObjs" 280 281 self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e', 'p']) 282 283 self.tdb.Insert(tabname, {'a': "the letter A", 284 'b': "the letter B", 285 'c': "is for cookie"}) 286 self.tdb.Insert(tabname, {'a': "is for aardvark", 287 'e': "the letter E", 288 'c': "is for cookie", 289 'd': "is for dog"}) 290 self.tdb.Insert(tabname, {'a': "the letter A", 291 'e': "the letter E", 292 'c': "is for cookie", 293 'p': "is for Python"}) 294 295 values = self.tdb.Select( 296 tabname, ['p', 'e'], 297 conditions={'e': dbtables.PrefixCond('the l')}) 298 self.assertEqual(len(values), 2, values) 299 self.assertEqual(values[0]['e'], values[1]['e'], values) 300 self.assertNotEqual(values[0]['p'], values[1]['p'], values) 301 302 values = self.tdb.Select( 303 tabname, ['d', 'a'], 304 conditions={'a': dbtables.LikeCond('%aardvark%')}) 305 self.assertEqual(len(values), 1, values) 306 self.assertEqual(values[0]['d'], "is for dog", values) 307 self.assertEqual(values[0]['a'], "is for aardvark", values) 308 309 values = self.tdb.Select(tabname, None, 310 {'b': dbtables.Cond(), 311 'e':dbtables.LikeCond('%letter%'), 312 'a':dbtables.PrefixCond('is'), 313 'd':dbtables.ExactCond('is for dog'), 314 'c':dbtables.PrefixCond('is for'), 315 'p':lambda s: not s}) 316 self.assertEqual(len(values), 1, values) 317 self.assertEqual(values[0]['d'], "is for dog", values) 318 self.assertEqual(values[0]['a'], "is for aardvark", values) 319 320 def test_Delete(self): 321 tabname = "test_Delete" 322 self.tdb.CreateTable(tabname, ['x', 'y', 'z']) 323 324 # prior to 2001-05-09 there was a bug where Delete() would 325 # fail if it encountered any rows that did not have values in 326 # every column. 327 # Hunted and Squashed by <Donwulff> (Jukka Santala - donwulff@nic.fi) 328 self.tdb.Insert(tabname, {'x': 'X1', 'y':'Y1'}) 329 self.tdb.Insert(tabname, {'x': 'X2', 'y':'Y2', 'z': 'Z2'}) 330 331 self.tdb.Delete(tabname, conditions={'x': dbtables.PrefixCond('X')}) 332 values = self.tdb.Select(tabname, ['y'], 333 conditions={'x': dbtables.PrefixCond('X')}) 334 self.assertEqual(len(values), 0) 335 336 def test_Modify(self): 337 tabname = "test_Modify" 338 self.tdb.CreateTable(tabname, ['Name', 'Type', 'Access']) 339 340 self.tdb.Insert(tabname, {'Name': 'Index to MP3 files.doc', 341 'Type': 'Word', 'Access': '8'}) 342 self.tdb.Insert(tabname, {'Name': 'Nifty.MP3', 'Access': '1'}) 343 self.tdb.Insert(tabname, {'Type': 'Unknown', 'Access': '0'}) 344 345 def set_type(type): 346 if type is None: 347 return 'MP3' 348 return type 349 350 def increment_access(count): 351 return str(int(count)+1) 352 353 def remove_value(value): 354 return None 355 356 self.tdb.Modify(tabname, 357 conditions={'Access': dbtables.ExactCond('0')}, 358 mappings={'Access': remove_value}) 359 self.tdb.Modify(tabname, 360 conditions={'Name': dbtables.LikeCond('%MP3%')}, 361 mappings={'Type': set_type}) 362 self.tdb.Modify(tabname, 363 conditions={'Name': dbtables.LikeCond('%')}, 364 mappings={'Access': increment_access}) 365 366 try: 367 self.tdb.Modify(tabname, 368 conditions={'Name': dbtables.LikeCond('%')}, 369 mappings={'Access': 'What is your quest?'}) 370 except TypeError: 371 # success, the string value in mappings isn't callable 372 pass 373 else: 374 raise RuntimeError, "why was TypeError not raised for bad callable?" 375 376 # Delete key in select conditions 377 values = self.tdb.Select( 378 tabname, None, 379 conditions={'Type': dbtables.ExactCond('Unknown')}) 380 self.assertEqual(len(values), 1, values) 381 self.assertEqual(values[0]['Name'], None, values) 382 self.assertEqual(values[0]['Access'], None, values) 383 384 # Modify value by select conditions 385 values = self.tdb.Select( 386 tabname, None, 387 conditions={'Name': dbtables.ExactCond('Nifty.MP3')}) 388 self.assertEqual(len(values), 1, values) 389 self.assertEqual(values[0]['Type'], "MP3", values) 390 self.assertEqual(values[0]['Access'], "2", values) 391 392 # Make sure change applied only to select conditions 393 values = self.tdb.Select( 394 tabname, None, conditions={'Name': dbtables.LikeCond('%doc%')}) 395 self.assertEqual(len(values), 1, values) 396 self.assertEqual(values[0]['Type'], "Word", values) 397 self.assertEqual(values[0]['Access'], "9", values) 398 399 400def test_suite(): 401 suite = unittest.TestSuite() 402 suite.addTest(unittest.makeSuite(TableDBTestCase)) 403 return suite 404 405 406if __name__ == '__main__': 407 unittest.main(defaultTest='test_suite') 408