1# pylint: disable-msg=C0111
2#!/usr/bin/python
3#
4# Copyright 2008 Google Inc. All Rights Reserved.
5
6"""Tests for action_common."""
7
8import unittest, sys, copy
9
10import common
11from autotest_lib.cli import cli_mock, action_common, rpc
12
13#
14# List action
15#
16class atest_list_unittest(cli_mock.cli_unittest):
17    def test_check_for_wilcard_none(self):
18        orig_filters = {'name__in': ['item0', 'item1']}
19        orig_checks = {'name__in': ['item0', 'item1']}
20        mytest = action_common.atest_list()
21
22        filters = copy.deepcopy(orig_filters)
23        checks = copy.deepcopy(orig_checks)
24        mytest.check_for_wildcard(filters, checks)
25        self.assertEqual(filters, orig_filters)
26        self.assertEqual(checks, orig_checks)
27
28
29    def test_check_for_wilcard_none_list(self):
30        orig_filters = {'name__in': ['item0']}
31        orig_checks = {'name__in': ['item0']}
32        mytest = action_common.atest_list()
33
34        filters = copy.deepcopy(orig_filters)
35        checks = copy.deepcopy(orig_checks)
36        mytest.check_for_wildcard(filters, checks)
37        self.assertEqual(filters, orig_filters)
38        self.assertEqual(checks, orig_checks)
39
40    def test_check_for_wilcard_one_list(self):
41        filters = {'something__in': ['item*']}
42        checks = {'something__in': ['item*']}
43        mytest = action_common.atest_list()
44
45        mytest.check_for_wildcard(filters, checks)
46        self.assertEqual(filters, {'something__startswith': 'item'})
47        self.assertEqual(checks, {'something__startswith': None})
48
49
50    def test_check_for_wilcard_one_string(self):
51        filters = {'something__name': 'item*'}
52        checks = {'something__name': 'item*'}
53        mytest = action_common.atest_list()
54
55        mytest.check_for_wildcard(filters, checks)
56        self.assertEqual(filters, {'something__name__startswith': 'item'})
57        self.assertEqual(checks, {'something__name__startswith': None})
58
59
60
61    def test_check_for_wilcard_one_string_login(self):
62        filters = {'something__login': 'item*'}
63        checks = {'something__login': 'item*'}
64        mytest = action_common.atest_list()
65
66        mytest.check_for_wildcard(filters, checks)
67        self.assertEqual(filters, {'something__login__startswith': 'item'})
68        self.assertEqual(checks, {'something__login__startswith': None})
69
70
71    def test_check_for_wilcard_two(self):
72        orig_filters = {'something__in': ['item0*', 'item1*']}
73        orig_checks = {'something__in': ['item0*', 'item1*']}
74        mytest = action_common.atest_list()
75
76        filters = copy.deepcopy(orig_filters)
77        checks = copy.deepcopy(orig_checks)
78        self.god.stub_function(sys, 'exit')
79        sys.exit.expect_call(1).and_raises(cli_mock.ExitException)
80        self.god.mock_io()
81        self.assertRaises(cli_mock.ExitException,
82                          mytest.check_for_wildcard, filters, checks)
83        (out, err) = self.god.unmock_io()
84        self.god.check_playback()
85        self.assertEqual(filters, orig_filters)
86        self.assertEqual(checks, orig_checks)
87
88
89    def _atest_list_execute(self, filters={}, check_results={}):
90        values = [{u'id': 180,
91                   u'platform': 0,
92                   u'name': u'label0',
93                   u'invalid': 0,
94                   u'kernel_config': u''},
95                  {u'id': 338,
96                   u'platform': 0,
97                   u'name': u'label1',
98                   u'invalid': 0,
99                   u'kernel_config': u''}]
100        mytest = action_common.atest_list()
101        mytest.afe = rpc.afe_comm()
102        self.mock_rpcs([('get_labels',
103                         filters,
104                         True,
105                         values)])
106        self.god.mock_io()
107        self.assertEqual(values,
108                         mytest.execute(op='get_labels',
109                                        filters=filters,
110                                        check_results=check_results))
111        (out, err) = self.god.unmock_io()
112        self.god.check_playback()
113        return (out, err)
114
115
116    def test_atest_list_execute_no_filters(self):
117        self._atest_list_execute()
118
119
120    def test_atest_list_execute_filters_all_good(self):
121        filters = {}
122        check_results = {}
123        filters['name__in'] = ['label0', 'label1']
124        check_results['name__in'] = 'name'
125        (out, err) = self._atest_list_execute(filters, check_results)
126        self.assertEqual(err, '')
127
128
129    def test_atest_list_execute_filters_good_and_bad(self):
130        filters = {}
131        check_results = {}
132        filters['name__in'] = ['label0', 'label1', 'label2']
133        check_results['name__in'] = 'name'
134        (out, err) = self._atest_list_execute(filters, check_results)
135        self.assertWords(err, ['Unknown', 'label2'])
136
137
138    def test_atest_list_execute_items_good_and_bad_no_check(self):
139        filters = {}
140        check_results = {}
141        filters['name__in'] = ['label0', 'label1', 'label2']
142        check_results['name__in'] = None
143        (out, err) = self._atest_list_execute(filters, check_results)
144        self.assertEqual(err, '')
145
146
147    def test_atest_list_execute_filters_wildcard(self):
148        filters = {}
149        check_results = {}
150        filters['name__in'] = ['label*']
151        check_results['name__in'] = 'name'
152        values = [{u'id': 180,
153                   u'platform': False,
154                   u'name': u'label0',
155                   u'invalid': False,
156                   u'kernel_config': u''},
157                  {u'id': 338,
158                   u'platform': False,
159                   u'name': u'label1',
160                   u'invalid': False,
161                   u'kernel_config': u''}]
162        mytest = action_common.atest_list()
163        mytest.afe = rpc.afe_comm()
164        self.mock_rpcs([('get_labels', {'name__startswith': 'label'},
165                         True, values)])
166        self.god.mock_io()
167        self.assertEqual(values,
168                         mytest.execute(op='get_labels',
169                                        filters=filters,
170                                        check_results=check_results))
171        (out, err) = self.god.unmock_io()
172        self.god.check_playback()
173        self.assertEqual(err, '')
174
175
176
177#
178# Creation & Deletion of a topic (ACL, label, user)
179#
180class atest_create_or_delete_unittest(cli_mock.cli_unittest):
181    def _create_cr_del(self, items):
182        def _items():
183            return items
184        crdel = action_common.atest_create_or_delete()
185        crdel.afe = rpc.afe_comm()
186
187        crdel.topic =  crdel.usage_topic = 'label'
188        crdel.op_action = 'add'
189        crdel.get_items = _items
190        crdel.data['platform'] = False
191        crdel.data_item_key = 'name'
192        crdel.no_confirmation = True
193        return crdel
194
195
196    def test_execute_create_one_topic(self):
197        acr = self._create_cr_del(['label0'])
198        self.mock_rpcs([('add_label',
199                         {'name': 'label0', 'platform': False},
200                         True, 42)])
201        ret = acr.execute()
202        self.god.check_playback()
203        self.assert_(['label0'], ret)
204
205
206    def test_execute_create_two_topics(self):
207        acr = self._create_cr_del(['label0', 'label1'])
208        self.mock_rpcs([('add_label',
209                         {'name': 'label0', 'platform': False},
210                         True, 42),
211                        ('add_label',
212                         {'name': 'label1', 'platform': False},
213                         True, 43)])
214        ret = acr.execute()
215        self.god.check_playback()
216        self.assertEqualNoOrder(['label0', 'label1'], ret)
217
218
219    def test_execute_create_error(self):
220        acr = self._create_cr_del(['label0'])
221        self.mock_rpcs([('add_label',
222                         {'name': 'label0', 'platform': False},
223                         False,
224                         '''ValidationError:
225                         {'name': 'This value must be unique (label0)'}''')])
226        ret = acr.execute()
227        self.god.check_playback()
228        self.assertEqualNoOrder([], ret)
229
230
231
232#
233# Adding or Removing users or hosts from a topic(ACL or label)
234#
235class atest_add_or_remove_unittest(cli_mock.cli_unittest):
236    def _create_add_remove(self, items, users=None, hosts=None):
237        def _items():
238            return [items]
239        addrm = action_common.atest_add_or_remove()
240        addrm.afe = rpc.afe_comm()
241        if users:
242            addrm.users = users
243        if hosts:
244            addrm.hosts = hosts
245
246        addrm.topic = 'acl_group'
247        addrm.msg_topic = 'ACL'
248        addrm.op_action = 'add'
249        addrm.msg_done = 'Added to'
250        addrm.get_items = _items
251        return addrm
252
253
254    def test__add_remove_uh_to_topic(self):
255        acl_addrm = self._create_add_remove('acl0',
256                                        users=['user0', 'user1'])
257        self.mock_rpcs([('acl_group_add_users',
258                         {'id': 'acl0',
259                          'users': ['user0', 'user1']},
260                         True,
261                         None)])
262        acl_addrm._add_remove_uh_to_topic('acl0', 'users')
263        self.god.check_playback()
264
265
266    def test__add_remove_uh_to_topic_raise(self):
267        acl_addrm = self._create_add_remove('acl0',
268                                        users=['user0', 'user1'])
269        self.assertRaises(AttributeError,
270                          acl_addrm._add_remove_uh_to_topic,
271                          'acl0', 'hosts')
272
273
274    def test_execute_add_or_remove_uh_to_topic_acl_users(self):
275        acl_addrm = self._create_add_remove('acl0',
276                                        users=['user0', 'user1'])
277        self.mock_rpcs([('acl_group_add_users',
278                         {'id': 'acl0',
279                          'users': ['user0', 'user1']},
280                         True,
281                         None)])
282        execute_result = acl_addrm.execute()
283        self.god.check_playback()
284        self.assertEqualNoOrder(['acl0'], execute_result['users'])
285        self.assertEqual([], execute_result['hosts'])
286
287
288
289    def test_execute_add_or_remove_uh_to_topic_acl_users_hosts(self):
290        acl_addrm = self._create_add_remove('acl0',
291                                            users=['user0', 'user1'],
292                                            hosts=['host0', 'host1'])
293        self.mock_rpcs([('acl_group_add_users',
294                         {'id': 'acl0',
295                          'users': ['user0', 'user1']},
296                         True,
297                         None),
298                        ('acl_group_add_hosts',
299                         {'id': 'acl0',
300                          'hosts': ['host0', 'host1']},
301                         True,
302                         None)])
303        execute_result = acl_addrm.execute()
304        self.god.check_playback()
305        self.assertEqualNoOrder(['acl0'], execute_result['users'])
306        self.assertEqualNoOrder(['acl0'], execute_result['hosts'])
307
308
309    def test_execute_add_or_remove_uh_to_topic_acl_bad_users(self):
310        acl_addrm = self._create_add_remove('acl0',
311                                            users=['user0', 'user1'])
312        self.mock_rpcs([('acl_group_add_users',
313                         {'id': 'acl0',
314                          'users': ['user0', 'user1']},
315                         False,
316                         'DoesNotExist: The following users do not exist: '
317                         'user0, user1')])
318        execute_result = acl_addrm.execute()
319        self.god.check_playback()
320        self.assertEqual([], execute_result['users'])
321        self.assertEqual([], execute_result['hosts'])
322        self.assertOutput(acl_addrm, execute_result,
323                          err_words_ok=['DoesNotExist',
324                                        'acl_group_add_users',
325                                        'user0', 'user1'],
326                          err_words_no = ['acl_group_add_hosts'])
327
328
329    def test_execute_add_or_remove_uh_to_topic_acl_bad_users_partial(self):
330        acl_addrm = self._create_add_remove('acl0',
331                                            users=['user0', 'user1'])
332        self.mock_rpcs([('acl_group_add_users',
333                         {'id': 'acl0',
334                          'users': ['user0', 'user1']},
335                         False,
336                         'DoesNotExist: The following users do not exist: '
337                         'user0'),
338                        ('acl_group_add_users',
339                         {'id': 'acl0',
340                          'users': ['user1']},
341                         True,
342                         None)])
343        execute_result = acl_addrm.execute()
344        self.god.check_playback()
345        self.assertEqual(['acl0'], execute_result['users'])
346        self.assertEqual([], execute_result['hosts'])
347        self.assertOutput(acl_addrm, execute_result,
348                          out_words_ok=["Added to ACL 'acl0'", 'user1'],
349                          err_words_ok=['DoesNotExist',
350                                        'acl_group_add_users',
351                                        'user0'],
352                          err_words_no = ['acl_group_add_hosts'])
353
354
355    def test_execute_add_or_remove_uh_to_topic_acl_bad_u_partial_kill(self):
356        acl_addrm = self._create_add_remove('acl0',
357                                            users=['user0', 'user1'])
358        acl_addrm.kill_on_failure = True
359        self.mock_rpcs([('acl_group_add_users',
360                         {'id': 'acl0',
361                          'users': ['user0', 'user1']},
362                         False,
363                         'DoesNotExist: The following users do not exist: '
364                         'user0')])
365        sys.exit.expect_call(1).and_raises(cli_mock.ExitException)
366        self.god.mock_io()
367        self.assertRaises(cli_mock.ExitException, acl_addrm.execute)
368        (out, err) = self.god.unmock_io()
369        self.god.check_playback()
370        self._check_output(out=out, err=err,
371                          err_words_ok=['DoesNotExist',
372                                        'acl_group_add_users',
373                                        'user0'],
374                          err_words_no = ['acl_group_add_hosts'])
375
376
377    def test_execute_add_or_remove_uh_to_topic_acl_bad_users_good_hosts(self):
378        acl_addrm = self._create_add_remove('acl0',
379                                        users=['user0', 'user1'],
380                                        hosts=['host0', 'host1'])
381        self.mock_rpcs([('acl_group_add_users',
382                         {'id': 'acl0',
383                          'users': ['user0', 'user1']},
384                         False,
385                         'DoesNotExist: The following users do not exist: '
386                         'user0, user1'),
387                        ('acl_group_add_hosts',
388                         {'id': 'acl0',
389                          'hosts': ['host0', 'host1']},
390                         True,
391                         None)])
392
393        execute_result = acl_addrm.execute()
394        self.god.check_playback()
395        self.assertEqual([], execute_result['users'])
396        self.assertEqual(['acl0'], execute_result['hosts'])
397        self.assertOutput(acl_addrm, execute_result,
398                          out_words_ok=["Added to ACL 'acl0' hosts:",
399                                        "host0", "host1"],
400                          err_words_ok=['DoesNotExist',
401                                        'acl_group_add_users',
402                                        'user0', 'user1'],
403                          err_words_no = ['acl_group_add_hosts'])
404
405
406    def test_execute_add_or_remove_uh_to_topic_acl_good_users_bad_hosts(self):
407        acl_addrm = self._create_add_remove('acl0 with space',
408                                        users=['user0', 'user1'],
409                                        hosts=['host0', 'host1'])
410        self.mock_rpcs([('acl_group_add_users',
411                         {'id': 'acl0 with space',
412                          'users': ['user0', 'user1']},
413                         True,
414                         None),
415                        ('acl_group_add_hosts',
416                         {'id': 'acl0 with space',
417                          'hosts': ['host0', 'host1']},
418                         False,
419                         'DoesNotExist: The following hosts do not exist: '
420                         'host0, host1')])
421
422        execute_result = acl_addrm.execute()
423        self.god.check_playback()
424        self.assertEqual(['acl0 with space'], execute_result['users'])
425        self.assertEqual([], execute_result['hosts'])
426        self.assertOutput(acl_addrm, execute_result,
427                          out_words_ok=["Added to ACL 'acl0 with space' users:",
428                                        "user0", "user1"],
429                          err_words_ok=['DoesNotExist',
430                                        'acl_group_add_hosts',
431                                        'host0', 'host1'],
432                          err_words_no = ['acl_group_add_users'])
433
434
435    def test_exe_add_or_remove_uh_to_topic_acl_good_u_bad_hosts_partial(self):
436        acl_addrm = self._create_add_remove('acl0',
437                                        users=['user0', 'user1'],
438                                        hosts=['host0', 'host1'])
439        self.mock_rpcs([('acl_group_add_users',
440                         {'id': 'acl0',
441                          'users': ['user0', 'user1']},
442                         True,
443                         None),
444                        ('acl_group_add_hosts',
445                         {'id': 'acl0',
446                          'hosts': ['host0', 'host1']},
447                         False,
448                         'DoesNotExist: The following hosts do not exist: '
449                         'host1'),
450                        ('acl_group_add_hosts',
451                         {'id': 'acl0',
452                          'hosts': ['host0']},
453                         True,
454                         None)])
455
456        execute_result = acl_addrm.execute()
457        self.god.check_playback()
458        self.assertEqual(['acl0'], execute_result['users'])
459        self.assertEqual(['acl0'], execute_result['hosts'])
460        self.assertOutput(acl_addrm, execute_result,
461                          out_words_ok=["Added to ACL 'acl0' users:",
462                                        "user0", "user1", "host0"],
463                          err_words_ok=['DoesNotExist',
464                                        'acl_group_add_hosts',
465                                        'host1'],
466                          err_words_no = ['acl_group_add_users'])
467
468
469    def test_execute_add_or_remove_uh_to_topic_acl_bad_users_bad_hosts(self):
470        acl_addrm = self._create_add_remove('acl0',
471                                        users=['user0', 'user1'],
472                                        hosts=['host0', 'host1'])
473        self.mock_rpcs([('acl_group_add_users',
474                         {'id': 'acl0',
475                          'users': ['user0', 'user1']},
476                         False,
477                         'DoesNotExist: The following users do not exist: '
478                         'user0, user1'),
479                        ('acl_group_add_hosts',
480                         {'id': 'acl0',
481                          'hosts': ['host0', 'host1']},
482                         False,
483                         'DoesNotExist: The following hosts do not exist: '
484                         'host0, host1')])
485
486
487        execute_result = acl_addrm.execute()
488        self.god.check_playback()
489        self.assertEqual([], execute_result['users'])
490        self.assertEqual([], execute_result['hosts'])
491        self.assertOutput(acl_addrm, execute_result,
492                          err_words_ok=['DoesNotExist',
493                                        'acl_group_add_hosts',
494                                        'host0', 'host1',
495                                        'acl_group_add_users',
496                                        'user0', 'user1'])
497
498
499    def test_execute_add_or_remove_uh_to_topic_acl_bad_u_bad_h_partial(self):
500        acl_addrm = self._create_add_remove('acl0',
501                                        users=['user0', 'user1'],
502                                        hosts=['host0', 'host1'])
503        self.mock_rpcs([('acl_group_add_users',
504                         {'id': 'acl0',
505                          'users': ['user0', 'user1']},
506                         False,
507                         'DoesNotExist: The following users do not exist: '
508                         'user0'),
509                        ('acl_group_add_users',
510                         {'id': 'acl0',
511                          'users': ['user1']},
512                         True,
513                         None),
514                        ('acl_group_add_hosts',
515                         {'id': 'acl0',
516                          'hosts': ['host0', 'host1']},
517                         False,
518                         'DoesNotExist: The following hosts do not exist: '
519                         'host1'),
520                        ('acl_group_add_hosts',
521                         {'id': 'acl0',
522                          'hosts': ['host0']},
523                         True,
524                         None)])
525        execute_result = acl_addrm.execute()
526        self.god.check_playback()
527        self.assertEqual(['acl0'], execute_result['users'])
528        self.assertEqual(['acl0'], execute_result['hosts'])
529        self.assertOutput(acl_addrm, execute_result,
530                          out_words_ok=["Added to ACL 'acl0' user:",
531                                        "Added to ACL 'acl0' host:",
532                                        'user1', 'host0'],
533                          err_words_ok=['DoesNotExist',
534                                        'acl_group_add_hosts',
535                                        'host1',
536                                        'acl_group_add_users',
537                                        'user0'])
538
539
540    def test_execute_add_or_remove_to_topic_bad_acl_uh(self):
541        acl_addrm = self._create_add_remove('acl0',
542                                        users=['user0', 'user1'],
543                                        hosts=['host0', 'host1'])
544        self.mock_rpcs([('acl_group_add_users',
545                         {'id': 'acl0',
546                          'users': ['user0', 'user1']},
547                         False,
548                         'DoesNotExist: acl_group matching '
549                         'query does not exist.'),
550                        ('acl_group_add_hosts',
551                         {'id': 'acl0',
552                          'hosts': ['host0', 'host1']},
553                         False,
554                         'DoesNotExist: acl_group matching '
555                         'query does not exist.')])
556        execute_result = acl_addrm.execute()
557        self.god.check_playback()
558        self.assertEqual([], execute_result['users'])
559        self.assertEqual([], execute_result['hosts'])
560        self.assertOutput(acl_addrm, execute_result,
561                          err_words_ok=['DoesNotExist',
562                                        'acl_group_add_hosts',
563                                        'acl_group_add_users'])
564
565
566if __name__ == '__main__':
567    unittest.main()
568