1import grp 2import mock_flimflam 3import os 4import pwd 5import stat 6import time 7import utils 8 9from autotest_lib.client.bin import test 10from autotest_lib.client.common_lib import error 11 12class network_ShillInitScripts(test.test): 13 """ Test that shill init scripts perform as expected. Use the 14 real filesystem (doing a best effort to archive and restore 15 current state). The shill manager is stopped and a proxy 16 DBus entity is installed to accept DBus messages that are sent 17 via "dbus-send" in the shill startup scripts. However, the 18 "real" shill is still also started from time to time and we 19 check that it is run with the right command line arguments. 20 """ 21 version = 1 22 save_directories = [ '/var/cache/shill', 23 '/run/shill', 24 '/run/state/logged-in', 25 '/run/dhcpcd', 26 '/var/lib/dhcpcd', 27 ] 28 fake_user = 'not-a-real-user@chromium.org' 29 saved_config = '/tmp/network_ShillInitScripts_saved_config.tgz' 30 cryptohome_path_command = 'cryptohome-path' 31 guest_shill_user_profile_dir = '/run/shill/guest_user_profile/shill' 32 guest_shill_user_log_dir = '/run/shill/guest_user_profile/shill_logs' 33 magic_header = '# --- shill init file test magic header ---' 34 35 36 def start_shill(self): 37 """ Starts a shill instance. """ 38 utils.start_service('shill') 39 40 41 def stop_shill(self): 42 """ Halt the running shill instance. """ 43 utils.stop_service('shill', ignore_status=True) 44 45 for attempt in range(10): 46 if not self.find_pid('shill'): 47 break 48 time.sleep(1) 49 else: 50 raise error.TestFail('Shill process does not appear to be dying') 51 52 53 def login(self, user=None): 54 """ Simulate the login process. 55 56 Note: "start" blocks until the "script" block completes. 57 58 @param user string user name (email address) to log in. 59 60 """ 61 62 if utils.has_systemd(): 63 start_cmd = (('systemctl set-environment CHROMEOS_USER=%s' 64 ' && systemctl start shill-start-user-session') % 65 (user or self.fake_user)) 66 else: 67 start_cmd = ('start shill-start-user-session CHROMEOS_USER=%s' % 68 (user or self.fake_user)) 69 utils.system(start_cmd) 70 71 72 def login_guest(self): 73 """ Simulate guest login. 74 75 For guest login, session-manager passes an empty CHROMEOS_USER arg. 76 77 """ 78 self.login('""') 79 80 81 def logout(self): 82 """ Simulate user logout. 83 84 Note: "start" blocks until the "script" block completes. 85 86 """ 87 utils.start_service('shill-stop-user-session') 88 89 90 def start_test(self): 91 """ Setup the start of the test. Stop shill and create test harness.""" 92 # Stop a system process on test duts for keeping connectivity up. 93 ret = utils.stop_service('recover_duts', ignore_status=True) 94 self.recover_duts_stopped = (ret == 0); 95 96 self.stop_shill() 97 98 # Deduce the root cryptohome directory name for our fake user. 99 self.root_cryptohome_dir = utils.system_output( 100 '%s system %s' % (self.cryptohome_path_command, self.fake_user)) 101 102 # Deduce the user cryptohome directory name for our fake user. 103 self.user_cryptohome_dir = utils.system_output( 104 '%s user %s' % (self.cryptohome_path_command, self.fake_user)) 105 106 # Deduce the directory for memory log storage. 107 self.user_cryptohome_log_dir = ('%s/shill_logs' % 108 self.root_cryptohome_dir) 109 110 # The sanitized hash of the username is the basename of the cryptohome. 111 self.fake_user_hash = os.path.basename(self.root_cryptohome_dir) 112 113 # Just in case this hash actually exists, add these to the list of 114 # saved directories. 115 self.save_directories.append(self.root_cryptohome_dir) 116 self.save_directories.append(self.user_cryptohome_dir) 117 118 # Archive the system state we will be modifying, then remove them. 119 utils.system('tar zcvf %s --directory / --ignore-failed-read %s' 120 ' 2>/dev/null' % 121 (self.saved_config, ' '.join(self.save_directories))) 122 utils.system('rm -rf %s' % ' '.join(self.save_directories), 123 ignore_status=True) 124 125 # Create the fake user's system cryptohome directory. 126 os.mkdir(self.root_cryptohome_dir) 127 self.new_shill_user_profile_dir = ('%s/shill' % 128 self.root_cryptohome_dir) 129 self.new_shill_user_profile = ('%s/shill.profile' % 130 self.new_shill_user_profile_dir) 131 132 # Create the fake user's user cryptohome directory. 133 os.mkdir(self.user_cryptohome_dir) 134 self.old_shill_user_profile_dir = ('%s/shill' % 135 self.user_cryptohome_dir) 136 self.old_shill_user_profile = ('%s/shill.profile' % 137 self.old_shill_user_profile_dir) 138 self.mock_flimflam = None 139 140 141 def start_mock_flimflam(self): 142 """ Start a mock flimflam instance to accept and log DBus calls. """ 143 self.mock_flimflam = mock_flimflam.MockFlimflam() 144 self.mock_flimflam.start() 145 146 147 def erase_state(self): 148 """ Remove all the test harness files. """ 149 utils.system('rm -rf %s' % ' '.join(self.save_directories)) 150 os.mkdir(self.root_cryptohome_dir) 151 os.mkdir(self.user_cryptohome_dir) 152 153 154 def end_test(self): 155 """ Perform cleanup at the end of the test. """ 156 if self.mock_flimflam: 157 self.mock_flimflam.quit() 158 self.mock_flimflam.join() 159 self.erase_state() 160 utils.system('tar zxvf %s --directory /' % self.saved_config) 161 utils.system('rm -f %s' % self.saved_config) 162 self.restart_system_processes() 163 164 165 def restart_system_processes(self): 166 """ Restart vital system services at the end of the test. """ 167 utils.start_service('shill', ignore_status=True) 168 if self.recover_duts_stopped: 169 utils.start_service('recover_duts', ignore_status=True) 170 171 172 def assure(self, must_be_true, assertion_name): 173 """ Perform a named assertion. 174 175 @param must_be_true boolean parameter that must be true. 176 @param assertion_name string name of this assertion. 177 178 """ 179 if not must_be_true: 180 raise error.TestFail('%s: Assertion failed: %s' % 181 (self.test_name, assertion_name)) 182 183 184 def assure_path_owner(self, path, owner): 185 """ Assert that |path| is owned by |owner|. 186 187 @param path string pathname to test. 188 @param owner string user name that should own |path|. 189 190 """ 191 self.assure(pwd.getpwuid(os.stat(path).st_uid)[0] == owner, 192 'Path %s is owned by %s' % (path, owner)) 193 194 195 def assure_path_group(self, path, group): 196 """ Assert that |path| is owned by |group|. 197 198 @param path string pathname to test. 199 @param group string group name that should own |path|. 200 201 """ 202 self.assure(grp.getgrgid(os.stat(path).st_gid)[0] == group, 203 'Path %s is group-owned by %s' % (path, group)) 204 205 206 def assure_exists(self, path, path_friendly_name): 207 """ Assert that |path| exists. 208 209 @param path string pathname to test. 210 @param path_friendly_name string user-parsable description of |path|. 211 212 """ 213 self.assure(os.path.exists(path), '%s exists' % path_friendly_name) 214 215 216 def assure_is_dir(self, path, path_friendly_name): 217 """ Assert that |path| is a directory. 218 219 @param path string pathname to test. 220 @param path_friendly_name string user-parsable description of |path|. 221 222 """ 223 self.assure_exists(path, path_friendly_name) 224 self.assure(stat.S_ISDIR(os.lstat(path).st_mode), 225 '%s is a directory' % path_friendly_name) 226 227 228 def assure_is_link(self, path, path_friendly_name): 229 """ Assert that |path| is a symbolic link. 230 231 @param path string pathname to test. 232 @param path_friendly_name string user-parsable description of |path|. 233 234 """ 235 self.assure_exists(path, path_friendly_name) 236 self.assure(stat.S_ISLNK(os.lstat(path).st_mode), 237 '%s is a symbolic link' % path_friendly_name) 238 239 240 def assure_is_link_to(self, path, pointee, path_friendly_name): 241 """ Assert that |path| is a symbolic link to |pointee|. 242 243 @param path string pathname to test. 244 @param pointee string pathname that |path| should point to. 245 @param path_friendly_name string user-parsable description of |path|. 246 247 """ 248 self.assure_is_link(path, path_friendly_name) 249 self.assure(os.readlink(path) == pointee, 250 '%s is a symbolic link to %s' % 251 (path_friendly_name, pointee)) 252 253 254 def assure_method_calls(self, expected_method_calls, assertion_name): 255 """ Assert that |expected_method_calls| were executed on mock_flimflam. 256 257 @param expected_method_calls list of string-tuple pairs of method 258 name + tuple of arguments. 259 @param assertion_name string name to assign to the assertion. 260 261 """ 262 method_calls = self.mock_flimflam.get_method_calls() 263 if len(expected_method_calls) != len(method_calls): 264 self.assure(False, '%s: method call count does not match' % 265 assertion_name) 266 for expected, actual in zip(expected_method_calls, method_calls): 267 self.assure(actual.method == expected[0], 268 '%s: method %s matches expected %s' % 269 (assertion_name, actual.method, expected[0])) 270 self.assure(actual.argument == expected[1], 271 '%s: argument %s matches expected %s' % 272 (assertion_name, actual.argument, expected[1])) 273 274 275 def create_file_with_contents(self, filename, contents): 276 """ Create a file named |filename| that contains |contents|. 277 278 @param filename string name of file. 279 @param contents string contents of file. 280 281 """ 282 with open(filename, 'w') as f: 283 f.write(contents) 284 285 286 def touch(self, filename): 287 """ Create an empty file named |filename|. 288 289 @param filename string name of file. 290 291 """ 292 self.create_file_with_contents(filename, '') 293 294 295 def create_new_shill_user_profile(self, contents): 296 """ Create a fake new user profile with |contents|. 297 298 @param contents string contents of the new user profile. 299 300 """ 301 os.mkdir(self.new_shill_user_profile_dir) 302 self.create_file_with_contents(self.new_shill_user_profile, contents) 303 304 305 def create_old_shill_user_profile(self, contents): 306 """ Create a fake old-style user profile with |contents|. 307 308 @param contents string contents of the old user profile. 309 310 """ 311 os.mkdir(self.old_shill_user_profile_dir) 312 self.create_file_with_contents(self.old_shill_user_profile, contents) 313 314 315 def file_contents(self, filename): 316 """ Returns the contents of |filename|. 317 318 @param filename string name of file to read. 319 320 """ 321 with open(filename) as f: 322 return f.read() 323 324 325 def find_pid(self, process_name): 326 """ Returns the process id of |process_name|. 327 328 @param process_name string name of process to search for. 329 330 """ 331 return utils.system_output('pgrep %s' % process_name, 332 ignore_status=True).split('\n')[0] 333 334 335 def get_commandline(self): 336 """ Returns the command line of the current shill executable. """ 337 pid = self.find_pid('shill') 338 return file('/proc/%s/cmdline' % pid).read().split('\0') 339 340 341 def run_once(self): 342 """ Main test loop. """ 343 try: 344 self.start_test() 345 except: 346 self.restart_system_processes() 347 raise 348 349 try: 350 self.run_tests([ 351 self.test_start_shill, 352 self.test_start_logged_in]) 353 354 # The tests above run a real instance of shill, whereas the tests 355 # below rely on a mock instance of shill. We must take care not 356 # to run the mock at the same time as a real shill instance. 357 self.start_mock_flimflam() 358 359 self.run_tests([ 360 self.test_login, 361 self.test_login_guest, 362 self.test_login_profile_exists, 363 self.test_login_old_shill_profile, 364 self.test_login_invalid_old_shill_profile, 365 self.test_login_ignore_old_shill_profile, 366 self.test_login_multi_profile, 367 self.test_logout]) 368 finally: 369 # Stop any shill instances started during testing. 370 self.stop_shill() 371 self.end_test() 372 373 374 def run_tests(self, tests): 375 """ Executes each of the test subparts in sequence. 376 377 @param tests list of methods to run. 378 379 """ 380 for test in tests: 381 self.test_name = test.__name__ 382 test() 383 self.stop_shill() 384 self.erase_state() 385 386 387 def test_start_shill(self): 388 """ Test all created pathnames during shill startup. 389 390 Also ensure the push argument is not provided by default. 391 392 """ 393 self.start_shill() 394 self.assure_is_dir('/run/shill', 'Shill run directory') 395 self.assure_is_dir('/var/lib/dhcpcd', 'dhcpcd lib directory') 396 self.assure_path_owner('/var/lib/dhcpcd', 'dhcp') 397 self.assure_path_group('/var/lib/dhcpcd', 'dhcp') 398 self.assure_is_dir('/run/dhcpcd', 'dhcpcd run directory') 399 self.assure_path_owner('/run/dhcpcd', 'dhcp') 400 self.assure_path_group('/run/dhcpcd', 'dhcp') 401 self.assure('--push=~chronos/shill' not in self.get_commandline(), 402 'Shill command line does not contain push argument') 403 404 405 def test_start_logged_in(self): 406 """ Tests starting up shill while a user is already logged in. 407 408 The "--push" argument should not be added even though shill is started 409 while a user is logged in. 410 411 """ 412 os.mkdir('/run/shill') 413 os.mkdir('/run/shill/user_profiles') 414 self.create_new_shill_user_profile('') 415 os.symlink(self.new_shill_user_profile_dir, 416 '/run/shill/user_profiles/chronos') 417 self.touch('/run/state/logged-in') 418 self.start_shill() 419 self.assure('--push=~chronos/shill' not in self.get_commandline(), 420 'Shill command line does not contain push argument') 421 os.unlink('/run/state/logged-in') 422 423 424 def test_login(self): 425 """ Test the login process. 426 427 Login should create a profile directory, then create and push 428 a user profile, given no previous state. 429 430 """ 431 os.mkdir('/run/shill') 432 self.login() 433 self.assure(not os.path.exists(self.old_shill_user_profile), 434 'Old shill user profile does not exist') 435 self.assure(not os.path.exists(self.new_shill_user_profile), 436 'New shill user profile does not exist') 437 # The DBus "CreateProfile" method should have been handled 438 # by our mock_flimflam instance, so the profile directory 439 # should not have actually been created. 440 self.assure_is_dir(self.new_shill_user_profile_dir, 441 'New shill user profile directory') 442 self.assure_is_dir('/run/shill/user_profiles', 443 'Shill profile root') 444 self.assure_is_link_to('/run/shill/user_profiles/chronos', 445 self.new_shill_user_profile_dir, 446 'Shill profile link') 447 self.assure_is_dir(self.user_cryptohome_log_dir, 448 'shill user log directory') 449 self.assure_is_link_to('/run/shill/log', 450 self.user_cryptohome_log_dir, 451 'Shill logs link') 452 self.assure_method_calls([[ 'CreateProfile', '~chronos/shill' ], 453 [ 'InsertUserProfile', 454 ('~chronos/shill', self.fake_user_hash) ]], 455 'CreateProfile and InsertUserProfile ' 456 'are called') 457 458 459 def test_login_guest(self): 460 """ Tests the guest login process. 461 462 Login should create a temporary profile directory in /run, 463 instead of using one within the root directory for normal users. 464 465 """ 466 os.mkdir('/run/shill') 467 self.login_guest() 468 self.assure(not os.path.exists(self.old_shill_user_profile), 469 'Old shill user profile does not exist') 470 self.assure(not os.path.exists(self.new_shill_user_profile), 471 'New shill user profile does not exist') 472 self.assure(not os.path.exists(self.new_shill_user_profile_dir), 473 'New shill user profile directory') 474 self.assure_is_dir(self.guest_shill_user_profile_dir, 475 'shill guest user profile directory') 476 self.assure_is_dir('/run/shill/user_profiles', 477 'Shill profile root') 478 self.assure_is_link_to('/run/shill/user_profiles/chronos', 479 self.guest_shill_user_profile_dir, 480 'Shill profile link') 481 self.assure_is_dir(self.guest_shill_user_log_dir, 482 'shill guest user log directory') 483 self.assure_is_link_to('/run/shill/log', 484 self.guest_shill_user_log_dir, 485 'Shill logs link') 486 self.assure_method_calls([[ 'CreateProfile', '~chronos/shill' ], 487 [ 'InsertUserProfile', 488 ('~chronos/shill', '') ]], 489 'CreateProfile and InsertUserProfile ' 490 'are called') 491 492 493 def test_login_profile_exists(self): 494 """ Test logging in a user whose profile already exists. 495 496 Login script should only push (and not create) the user profile 497 if a user profile already exists. 498 """ 499 os.mkdir('/run/shill') 500 os.mkdir(self.new_shill_user_profile_dir) 501 self.touch(self.new_shill_user_profile) 502 self.login() 503 self.assure_method_calls([[ 'InsertUserProfile', 504 ('~chronos/shill', self.fake_user_hash) ]], 505 'Only InsertUserProfile is called') 506 507 508 def test_login_old_shill_profile(self): 509 """ Test logging in a user with an old-style shill profile. 510 511 Login script should move an old shill user profile into place 512 if a new one does not exist. 513 """ 514 os.mkdir('/run/shill') 515 self.create_old_shill_user_profile(self.magic_header) 516 self.login() 517 self.assure(not os.path.exists(self.old_shill_user_profile), 518 'Old shill user profile no longer exists') 519 self.assure(not os.path.exists(self.old_shill_user_profile_dir), 520 'Old shill user profile directory no longer exists') 521 self.assure_exists(self.new_shill_user_profile, 522 'New shill profile') 523 self.assure(self.magic_header in 524 self.file_contents(self.new_shill_user_profile), 525 'Shill user profile contains our magic header') 526 self.assure_method_calls([[ 'InsertUserProfile', 527 ('~chronos/shill', self.fake_user_hash) ]], 528 'Only InsertUserProfile is called') 529 530 531 def make_symlink(self, path): 532 """ Create a symbolic link named |path|. 533 534 @param path string pathname of the symbolic link. 535 536 """ 537 os.symlink('/etc/hosts', path) 538 539 540 def make_special_file(self, path): 541 """ Create a special file named |path|. 542 543 @param path string pathname of the special file. 544 545 """ 546 os.mknod(path, stat.S_IFIFO) 547 548 549 def make_bad_owner(self, path): 550 """ Create a regular file with a strange ownership. 551 552 @param path string pathname of the file. 553 554 """ 555 self.touch(path) 556 os.lchown(path, 1000, 1000) 557 558 559 def test_login_invalid_old_shill_profile(self): 560 """ Test logging in with an invalid old-style shill profile. 561 562 Login script should ignore non-regular files or files not owned 563 by the correct user. The original file should be removed. 564 565 """ 566 os.mkdir('/run/shill') 567 for file_creation_method in (self.make_symlink, 568 self.make_special_file, 569 os.mkdir, 570 self.make_bad_owner): 571 os.mkdir(self.old_shill_user_profile_dir) 572 file_creation_method(self.old_shill_user_profile) 573 self.login() 574 self.assure(not os.path.exists(self.old_shill_user_profile), 575 'Old shill user profile no longer exists') 576 self.assure(not os.path.exists(self.old_shill_user_profile_dir), 577 'Old shill user profile directory no longer exists') 578 self.assure(not os.path.exists(self.new_shill_user_profile), 579 'New shill profile was not created') 580 self.assure_method_calls([[ 'CreateProfile', '~chronos/shill' ], 581 [ 'InsertUserProfile', 582 ('~chronos/shill', 583 self.fake_user_hash) ]], 584 'CreateProfile and InsertUserProfile ' 585 'are called') 586 os.unlink('/run/shill/user_profiles/chronos') 587 588 589 def test_login_ignore_old_shill_profile(self): 590 """ Test logging in with both an old and new profile present. 591 592 Login script should ignore an old shill user profile if a new one 593 exists. 594 595 """ 596 os.mkdir('/run/shill') 597 self.create_new_shill_user_profile('') 598 self.create_old_shill_user_profile(self.magic_header) 599 self.login() 600 self.assure(os.path.exists(self.old_shill_user_profile), 601 'Old shill user profile still exists') 602 self.assure_exists(self.new_shill_user_profile, 603 'New shill profile') 604 self.assure(self.magic_header not in 605 self.file_contents(self.new_shill_user_profile), 606 'Shill user profile does not contain our magic header') 607 self.assure_method_calls([[ 'InsertUserProfile', 608 ('~chronos/shill', self.fake_user_hash) ]], 609 'Only InsertUserProfile is called') 610 611 612 def test_login_multi_profile(self): 613 """ Test signalling shill about multiple logged-in users. 614 615 Login script should not create multiple profiles in parallel 616 if called more than once without an intervening logout. Only 617 the initial user profile should be created. 618 619 """ 620 os.mkdir('/run/shill') 621 self.create_new_shill_user_profile('') 622 623 # First logged-in user should create a profile (tested above). 624 self.login() 625 626 # Clear the mock method-call queue. 627 self.mock_flimflam.get_method_calls() 628 629 for attempt in range(5): 630 self.login() 631 self.assure_method_calls([], 'No more profiles are added to shill') 632 profile_links = os.listdir('/run/shill/user_profiles') 633 self.assure(len(profile_links) == 1, 'Only one profile exists') 634 self.assure(profile_links[0] == 'chronos', 635 'The profile link is for the chronos user') 636 self.assure_is_link_to('/run/shill/log', 637 self.user_cryptohome_log_dir, 638 'Shill log link for chronos') 639 640 641 def test_logout(self): 642 """ Test the logout process. """ 643 os.makedirs('/run/shill/user_profiles') 644 os.makedirs(self.guest_shill_user_profile_dir) 645 os.makedirs(self.guest_shill_user_log_dir) 646 self.touch('/run/state/logged-in') 647 self.logout() 648 self.assure(not os.path.exists('/run/shill/user_profiles'), 649 'User profile directory was removed') 650 self.assure(not os.path.exists(self.guest_shill_user_profile_dir), 651 'Guest user profile directory was removed') 652 self.assure(not os.path.exists(self.guest_shill_user_log_dir), 653 'Guest user log directory was removed') 654 self.assure_method_calls([[ 'PopAllUserProfiles', '' ]], 655 'PopAllUserProfiles is called') 656