1#-----------------------------------------------------------------------
2# A test suite for the table interface built on bsddb.db
3#-----------------------------------------------------------------------
4#
5# Copyright (C) 2000, 2001 by Autonomous Zone Industries
6# Copyright (C) 2002 Gregory P. Smith
7#
8# March 20, 2000
9#
10# License:      This is free software.  You may use this software for any
11#               purpose including modification/redistribution, so long as
12#               this header remains intact and that you do not claim any
13#               rights of ownership or authorship of this software.  This
14#               software has been tested, but no warranty is expressed or
15#               implied.
16#
17#   --  Gregory P. Smith <greg@krypto.org>
18#
19# $Id$
20
21import os, re, sys
22
23if sys.version_info[0] < 3 :
24    try:
25        import cPickle
26        pickle = cPickle
27    except ImportError:
28        import pickle
29else :
30    import pickle
31
32import unittest
33from test_all import db, dbtables, test_support, verbose, \
34        get_new_environment_path, get_new_database_path
35
36#----------------------------------------------------------------------
37
38class TableDBTestCase(unittest.TestCase):
39    db_name = 'test-table.db'
40
41    def setUp(self):
42        import sys
43        if sys.version_info[0] >= 3 :
44            from test_all import do_proxy_db_py3k
45            self._flag_proxy_db_py3k = do_proxy_db_py3k(False)
46
47        self.testHomeDir = get_new_environment_path()
48        self.tdb = dbtables.bsdTableDB(
49            filename='tabletest.db', dbhome=self.testHomeDir, create=1)
50
51    def tearDown(self):
52        self.tdb.close()
53        import sys
54        if sys.version_info[0] >= 3 :
55            from test_all import do_proxy_db_py3k
56            do_proxy_db_py3k(self._flag_proxy_db_py3k)
57        test_support.rmtree(self.testHomeDir)
58
59    def test01(self):
60        tabname = "test01"
61        colname = 'cool numbers'
62        try:
63            self.tdb.Drop(tabname)
64        except dbtables.TableDBError:
65            pass
66        self.tdb.CreateTable(tabname, [colname])
67        import sys
68        if sys.version_info[0] < 3 :
69            self.tdb.Insert(tabname, {colname: pickle.dumps(3.14159, 1)})
70        else :
71            self.tdb.Insert(tabname, {colname: pickle.dumps(3.14159,
72                1).decode("iso8859-1")})  # 8 bits
73
74        if verbose:
75            self.tdb._db_print()
76
77        values = self.tdb.Select(
78            tabname, [colname], conditions={colname: None})
79
80        import sys
81        if sys.version_info[0] < 3 :
82            colval = pickle.loads(values[0][colname])
83        else :
84            colval = pickle.loads(bytes(values[0][colname], "iso8859-1"))
85        self.assertGreater(colval, 3.141)
86        self.assertLess(colval, 3.142)
87
88
89    def test02(self):
90        tabname = "test02"
91        col0 = 'coolness factor'
92        col1 = 'but can it fly?'
93        col2 = 'Species'
94
95        import sys
96        if sys.version_info[0] < 3 :
97            testinfo = [
98                {col0: pickle.dumps(8, 1), col1: 'no', col2: 'Penguin'},
99                {col0: pickle.dumps(-1, 1), col1: 'no', col2: 'Turkey'},
100                {col0: pickle.dumps(9, 1), col1: 'yes', col2: 'SR-71A Blackbird'}
101            ]
102        else :
103            testinfo = [
104                {col0: pickle.dumps(8, 1).decode("iso8859-1"),
105                    col1: 'no', col2: 'Penguin'},
106                {col0: pickle.dumps(-1, 1).decode("iso8859-1"),
107                    col1: 'no', col2: 'Turkey'},
108                {col0: pickle.dumps(9, 1).decode("iso8859-1"),
109                    col1: 'yes', col2: 'SR-71A Blackbird'}
110            ]
111
112        try:
113            self.tdb.Drop(tabname)
114        except dbtables.TableDBError:
115            pass
116        self.tdb.CreateTable(tabname, [col0, col1, col2])
117        for row in testinfo :
118            self.tdb.Insert(tabname, row)
119
120        import sys
121        if sys.version_info[0] < 3 :
122            values = self.tdb.Select(tabname, [col2],
123                conditions={col0: lambda x: pickle.loads(x) >= 8})
124        else :
125            values = self.tdb.Select(tabname, [col2],
126                conditions={col0: lambda x:
127                    pickle.loads(bytes(x, "iso8859-1")) >= 8})
128
129        self.assertEqual(len(values), 2)
130        if values[0]['Species'] == 'Penguin' :
131            self.assertEqual(values[1]['Species'], 'SR-71A Blackbird')
132        elif values[0]['Species'] == 'SR-71A Blackbird' :
133            self.assertEqual(values[1]['Species'], 'Penguin')
134        else :
135            if verbose:
136                print "values= %r" % (values,)
137            raise RuntimeError("Wrong values returned!")
138
139    def test03(self):
140        tabname = "test03"
141        try:
142            self.tdb.Drop(tabname)
143        except dbtables.TableDBError:
144            pass
145        if verbose:
146            print '...before CreateTable...'
147            self.tdb._db_print()
148        self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e'])
149        if verbose:
150            print '...after CreateTable...'
151            self.tdb._db_print()
152        self.tdb.Drop(tabname)
153        if verbose:
154            print '...after Drop...'
155            self.tdb._db_print()
156        self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e'])
157
158        try:
159            self.tdb.Insert(tabname,
160                            {'a': "",
161                             'e': pickle.dumps([{4:5, 6:7}, 'foo'], 1),
162                             'f': "Zero"})
163            self.fail('Expected an exception')
164        except dbtables.TableDBError:
165            pass
166
167        try:
168            self.tdb.Select(tabname, [], conditions={'foo': '123'})
169            self.fail('Expected an exception')
170        except dbtables.TableDBError:
171            pass
172
173        self.tdb.Insert(tabname,
174                        {'a': '42',
175                         'b': "bad",
176                         'c': "meep",
177                         'e': 'Fuzzy wuzzy was a bear'})
178        self.tdb.Insert(tabname,
179                        {'a': '581750',
180                         'b': "good",
181                         'd': "bla",
182                         'c': "black",
183                         'e': 'fuzzy was here'})
184        self.tdb.Insert(tabname,
185                        {'a': '800000',
186                         'b': "good",
187                         'd': "bla",
188                         'c': "black",
189                         'e': 'Fuzzy wuzzy is a bear'})
190
191        if verbose:
192            self.tdb._db_print()
193
194        # this should return two rows
195        values = self.tdb.Select(tabname, ['b', 'a', 'd'],
196            conditions={'e': re.compile('wuzzy').search,
197                        'a': re.compile('^[0-9]+$').match})
198        self.assertEqual(len(values), 2)
199
200        # now lets delete one of them and try again
201        self.tdb.Delete(tabname, conditions={'b': dbtables.ExactCond('good')})
202        values = self.tdb.Select(
203            tabname, ['a', 'd', 'b'],
204            conditions={'e': dbtables.PrefixCond('Fuzzy')})
205        self.assertEqual(len(values), 1)
206        self.assertEqual(values[0]['d'], None)
207
208        values = self.tdb.Select(tabname, ['b'],
209            conditions={'c': lambda c: c == 'meep'})
210        self.assertEqual(len(values), 1)
211        self.assertEqual(values[0]['b'], "bad")
212
213
214    def test04_MultiCondSelect(self):
215        tabname = "test04_MultiCondSelect"
216        try:
217            self.tdb.Drop(tabname)
218        except dbtables.TableDBError:
219            pass
220        self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e'])
221
222        try:
223            self.tdb.Insert(tabname,
224                            {'a': "",
225                             'e': pickle.dumps([{4:5, 6:7}, 'foo'], 1),
226                             'f': "Zero"})
227            self.fail('Expected an exception')
228        except dbtables.TableDBError:
229            pass
230
231        self.tdb.Insert(tabname, {'a': "A", 'b': "B", 'c': "C", 'd': "D",
232                                  'e': "E"})
233        self.tdb.Insert(tabname, {'a': "-A", 'b': "-B", 'c': "-C", 'd': "-D",
234                                  'e': "-E"})
235        self.tdb.Insert(tabname, {'a': "A-", 'b': "B-", 'c': "C-", 'd': "D-",
236                                  'e': "E-"})
237
238        if verbose:
239            self.tdb._db_print()
240
241        # This select should return 0 rows.  it is designed to test
242        # the bug identified and fixed in sourceforge bug # 590449
243        # (Big Thanks to "Rob Tillotson (n9mtb)" for tracking this down
244        # and supplying a fix!!  This one caused many headaches to say
245        # the least...)
246        values = self.tdb.Select(tabname, ['b', 'a', 'd'],
247            conditions={'e': dbtables.ExactCond('E'),
248                        'a': dbtables.ExactCond('A'),
249                        'd': dbtables.PrefixCond('-')
250                       } )
251        self.assertEqual(len(values), 0, values)
252
253
254    def test_CreateOrExtend(self):
255        tabname = "test_CreateOrExtend"
256
257        self.tdb.CreateOrExtendTable(
258            tabname, ['name', 'taste', 'filling', 'alcohol content', 'price'])
259        try:
260            self.tdb.Insert(tabname,
261                            {'taste': 'crap',
262                             'filling': 'no',
263                             'is it Guinness?': 'no'})
264            self.fail("Insert should've failed due to bad column name")
265        except:
266            pass
267        self.tdb.CreateOrExtendTable(tabname,
268                                     ['name', 'taste', 'is it Guinness?'])
269
270        # these should both succeed as the table should contain the union of both sets of columns.
271        self.tdb.Insert(tabname, {'taste': 'crap', 'filling': 'no',
272                                  'is it Guinness?': 'no'})
273        self.tdb.Insert(tabname, {'taste': 'great', 'filling': 'yes',
274                                  'is it Guinness?': 'yes',
275                                  'name': 'Guinness'})
276
277
278    def test_CondObjs(self):
279        tabname = "test_CondObjs"
280
281        self.tdb.CreateTable(tabname, ['a', 'b', 'c', 'd', 'e', 'p'])
282
283        self.tdb.Insert(tabname, {'a': "the letter A",
284                                  'b': "the letter B",
285                                  'c': "is for cookie"})
286        self.tdb.Insert(tabname, {'a': "is for aardvark",
287                                  'e': "the letter E",
288                                  'c': "is for cookie",
289                                  'd': "is for dog"})
290        self.tdb.Insert(tabname, {'a': "the letter A",
291                                  'e': "the letter E",
292                                  'c': "is for cookie",
293                                  'p': "is for Python"})
294
295        values = self.tdb.Select(
296            tabname, ['p', 'e'],
297            conditions={'e': dbtables.PrefixCond('the l')})
298        self.assertEqual(len(values), 2, values)
299        self.assertEqual(values[0]['e'], values[1]['e'], values)
300        self.assertNotEqual(values[0]['p'], values[1]['p'], values)
301
302        values = self.tdb.Select(
303            tabname, ['d', 'a'],
304            conditions={'a': dbtables.LikeCond('%aardvark%')})
305        self.assertEqual(len(values), 1, values)
306        self.assertEqual(values[0]['d'], "is for dog", values)
307        self.assertEqual(values[0]['a'], "is for aardvark", values)
308
309        values = self.tdb.Select(tabname, None,
310                                 {'b': dbtables.Cond(),
311                                  'e':dbtables.LikeCond('%letter%'),
312                                  'a':dbtables.PrefixCond('is'),
313                                  'd':dbtables.ExactCond('is for dog'),
314                                  'c':dbtables.PrefixCond('is for'),
315                                  'p':lambda s: not s})
316        self.assertEqual(len(values), 1, values)
317        self.assertEqual(values[0]['d'], "is for dog", values)
318        self.assertEqual(values[0]['a'], "is for aardvark", values)
319
320    def test_Delete(self):
321        tabname = "test_Delete"
322        self.tdb.CreateTable(tabname, ['x', 'y', 'z'])
323
324        # prior to 2001-05-09 there was a bug where Delete() would
325        # fail if it encountered any rows that did not have values in
326        # every column.
327        # Hunted and Squashed by <Donwulff> (Jukka Santala - donwulff@nic.fi)
328        self.tdb.Insert(tabname, {'x': 'X1', 'y':'Y1'})
329        self.tdb.Insert(tabname, {'x': 'X2', 'y':'Y2', 'z': 'Z2'})
330
331        self.tdb.Delete(tabname, conditions={'x': dbtables.PrefixCond('X')})
332        values = self.tdb.Select(tabname, ['y'],
333                                 conditions={'x': dbtables.PrefixCond('X')})
334        self.assertEqual(len(values), 0)
335
336    def test_Modify(self):
337        tabname = "test_Modify"
338        self.tdb.CreateTable(tabname, ['Name', 'Type', 'Access'])
339
340        self.tdb.Insert(tabname, {'Name': 'Index to MP3 files.doc',
341                                  'Type': 'Word', 'Access': '8'})
342        self.tdb.Insert(tabname, {'Name': 'Nifty.MP3', 'Access': '1'})
343        self.tdb.Insert(tabname, {'Type': 'Unknown', 'Access': '0'})
344
345        def set_type(type):
346            if type is None:
347                return 'MP3'
348            return type
349
350        def increment_access(count):
351            return str(int(count)+1)
352
353        def remove_value(value):
354            return None
355
356        self.tdb.Modify(tabname,
357                        conditions={'Access': dbtables.ExactCond('0')},
358                        mappings={'Access': remove_value})
359        self.tdb.Modify(tabname,
360                        conditions={'Name': dbtables.LikeCond('%MP3%')},
361                        mappings={'Type': set_type})
362        self.tdb.Modify(tabname,
363                        conditions={'Name': dbtables.LikeCond('%')},
364                        mappings={'Access': increment_access})
365
366        try:
367            self.tdb.Modify(tabname,
368                            conditions={'Name': dbtables.LikeCond('%')},
369                            mappings={'Access': 'What is your quest?'})
370        except TypeError:
371            # success, the string value in mappings isn't callable
372            pass
373        else:
374            raise RuntimeError, "why was TypeError not raised for bad callable?"
375
376        # Delete key in select conditions
377        values = self.tdb.Select(
378            tabname, None,
379            conditions={'Type': dbtables.ExactCond('Unknown')})
380        self.assertEqual(len(values), 1, values)
381        self.assertEqual(values[0]['Name'], None, values)
382        self.assertEqual(values[0]['Access'], None, values)
383
384        # Modify value by select conditions
385        values = self.tdb.Select(
386            tabname, None,
387            conditions={'Name': dbtables.ExactCond('Nifty.MP3')})
388        self.assertEqual(len(values), 1, values)
389        self.assertEqual(values[0]['Type'], "MP3", values)
390        self.assertEqual(values[0]['Access'], "2", values)
391
392        # Make sure change applied only to select conditions
393        values = self.tdb.Select(
394            tabname, None, conditions={'Name': dbtables.LikeCond('%doc%')})
395        self.assertEqual(len(values), 1, values)
396        self.assertEqual(values[0]['Type'], "Word", values)
397        self.assertEqual(values[0]['Access'], "9", values)
398
399
400def test_suite():
401    suite = unittest.TestSuite()
402    suite.addTest(unittest.makeSuite(TableDBTestCase))
403    return suite
404
405
406if __name__ == '__main__':
407    unittest.main(defaultTest='test_suite')
408