1import grp
2import mock_flimflam
3import os
4import pwd
5import stat
6import time
7import utils
8
9from autotest_lib.client.bin import test
10from autotest_lib.client.common_lib import error
11
12class network_ShillInitScripts(test.test):
13    """ Test that shill init scripts perform as expected.  Use the
14        real filesystem (doing a best effort to archive and restore
15        current state).  The shill manager is stopped and a proxy
16        DBus entity is installed to accept DBus messages that are sent
17        via "dbus-send" in the shill startup scripts.  However, the
18        "real" shill is still also started from time to time and we
19        check that it is run with the right command line arguments.
20    """
21    version = 1
22    save_directories = [ '/var/cache/shill',
23                         '/run/shill',
24                         '/run/state/logged-in',
25                         '/run/dhcpcd',
26                         '/var/lib/dhcpcd',
27                         ]
28    fake_user = 'not-a-real-user@chromium.org'
29    saved_config = '/tmp/network_ShillInitScripts_saved_config.tgz'
30    cryptohome_path_command = 'cryptohome-path'
31    guest_shill_user_profile_dir = '/run/shill/guest_user_profile/shill'
32    guest_shill_user_log_dir = '/run/shill/guest_user_profile/shill_logs'
33    magic_header = '# --- shill init file test magic header ---'
34
35
36    def start_shill(self):
37        """ Starts a shill instance. """
38        utils.start_service('shill')
39
40
41    def stop_shill(self):
42        """ Halt the running shill instance. """
43        utils.stop_service('shill', ignore_status=True)
44
45        for attempt in range(10):
46            if not self.find_pid('shill'):
47                break
48            time.sleep(1)
49        else:
50            raise error.TestFail('Shill process does not appear to be dying')
51
52
53    def login(self, user=None):
54        """ Simulate the login process.
55
56        Note: "start" blocks until the "script" block completes.
57
58        @param user string user name (email address) to log in.
59
60        """
61
62        if utils.has_systemd():
63            start_cmd = (('systemctl set-environment CHROMEOS_USER=%s'
64                          ' && systemctl start shill-start-user-session') %
65                         (user or self.fake_user))
66        else:
67            start_cmd = ('start shill-start-user-session CHROMEOS_USER=%s' %
68                         (user or self.fake_user))
69        utils.system(start_cmd)
70
71
72    def login_guest(self):
73        """ Simulate guest login.
74
75        For guest login, session-manager passes an empty CHROMEOS_USER arg.
76
77        """
78        self.login('""')
79
80
81    def logout(self):
82        """ Simulate user logout.
83
84        Note: "start" blocks until the "script" block completes.
85
86        """
87        utils.start_service('shill-stop-user-session')
88
89
90    def start_test(self):
91        """ Setup the start of the test.  Stop shill and create test harness."""
92        # Stop a system process on test duts for keeping connectivity up.
93        ret = utils.stop_service('recover_duts', ignore_status=True)
94        self.recover_duts_stopped = (ret == 0);
95
96        self.stop_shill()
97
98        # Deduce the root cryptohome directory name for our fake user.
99        self.root_cryptohome_dir = utils.system_output(
100            '%s system %s' % (self.cryptohome_path_command, self.fake_user))
101
102        # Deduce the user cryptohome directory name for our fake user.
103        self.user_cryptohome_dir = utils.system_output(
104            '%s user %s' % (self.cryptohome_path_command, self.fake_user))
105
106        # Deduce the directory for memory log storage.
107        self.user_cryptohome_log_dir = ('%s/shill_logs' %
108                                        self.root_cryptohome_dir)
109
110        # The sanitized hash of the username is the basename of the cryptohome.
111        self.fake_user_hash = os.path.basename(self.root_cryptohome_dir)
112
113        # Just in case this hash actually exists, add these to the list of
114        # saved directories.
115        self.save_directories.append(self.root_cryptohome_dir)
116        self.save_directories.append(self.user_cryptohome_dir)
117
118        # Archive the system state we will be modifying, then remove them.
119        utils.system('tar zcvf %s --directory / --ignore-failed-read %s'
120                     ' 2>/dev/null' %
121                     (self.saved_config, ' '.join(self.save_directories)))
122        utils.system('rm -rf %s' % ' '.join(self.save_directories),
123                     ignore_status=True)
124
125        # Create the fake user's system cryptohome directory.
126        os.mkdir(self.root_cryptohome_dir)
127        self.new_shill_user_profile_dir = ('%s/shill' %
128                                           self.root_cryptohome_dir)
129        self.new_shill_user_profile = ('%s/shill.profile' %
130                                       self.new_shill_user_profile_dir)
131
132        # Create the fake user's user cryptohome directory.
133        os.mkdir(self.user_cryptohome_dir)
134        self.old_shill_user_profile_dir = ('%s/shill' %
135                                           self.user_cryptohome_dir)
136        self.old_shill_user_profile = ('%s/shill.profile' %
137                                       self.old_shill_user_profile_dir)
138        self.mock_flimflam = None
139
140
141    def start_mock_flimflam(self):
142        """ Start a mock flimflam instance to accept and log DBus calls. """
143        self.mock_flimflam = mock_flimflam.MockFlimflam()
144        self.mock_flimflam.start()
145
146
147    def erase_state(self):
148        """ Remove all the test harness files. """
149        utils.system('rm -rf %s' % ' '.join(self.save_directories))
150        os.mkdir(self.root_cryptohome_dir)
151        os.mkdir(self.user_cryptohome_dir)
152
153
154    def end_test(self):
155        """ Perform cleanup at the end of the test. """
156        if self.mock_flimflam:
157            self.mock_flimflam.quit()
158            self.mock_flimflam.join()
159        self.erase_state()
160        utils.system('tar zxvf %s --directory /' % self.saved_config)
161        utils.system('rm -f %s' % self.saved_config)
162        self.restart_system_processes()
163
164
165    def restart_system_processes(self):
166        """ Restart vital system services at the end of the test. """
167        utils.start_service('shill', ignore_status=True)
168        if self.recover_duts_stopped:
169            utils.start_service('recover_duts', ignore_status=True)
170
171
172    def assure(self, must_be_true, assertion_name):
173        """ Perform a named assertion.
174
175        @param must_be_true boolean parameter that must be true.
176        @param assertion_name string name of this assertion.
177
178        """
179        if not must_be_true:
180            raise error.TestFail('%s: Assertion failed: %s' %
181                                 (self.test_name, assertion_name))
182
183
184    def assure_path_owner(self, path, owner):
185        """ Assert that |path| is owned by |owner|.
186
187        @param path string pathname to test.
188        @param owner string user name that should own |path|.
189
190        """
191        self.assure(pwd.getpwuid(os.stat(path).st_uid)[0] == owner,
192                    'Path %s is owned by %s' % (path, owner))
193
194
195    def assure_path_group(self, path, group):
196        """ Assert that |path| is owned by |group|.
197
198        @param path string pathname to test.
199        @param group string group name that should own |path|.
200
201        """
202        self.assure(grp.getgrgid(os.stat(path).st_gid)[0] == group,
203                    'Path %s is group-owned by %s' % (path, group))
204
205
206    def assure_exists(self, path, path_friendly_name):
207        """ Assert that |path| exists.
208
209        @param path string pathname to test.
210        @param path_friendly_name string user-parsable description of |path|.
211
212        """
213        self.assure(os.path.exists(path), '%s exists' % path_friendly_name)
214
215
216    def assure_is_dir(self, path, path_friendly_name):
217        """ Assert that |path| is a directory.
218
219        @param path string pathname to test.
220        @param path_friendly_name string user-parsable description of |path|.
221
222        """
223        self.assure_exists(path, path_friendly_name)
224        self.assure(stat.S_ISDIR(os.lstat(path).st_mode),
225                    '%s is a directory' % path_friendly_name)
226
227
228    def assure_is_link(self, path, path_friendly_name):
229        """ Assert that |path| is a symbolic link.
230
231        @param path string pathname to test.
232        @param path_friendly_name string user-parsable description of |path|.
233
234        """
235        self.assure_exists(path, path_friendly_name)
236        self.assure(stat.S_ISLNK(os.lstat(path).st_mode),
237                    '%s is a symbolic link' % path_friendly_name)
238
239
240    def assure_is_link_to(self, path, pointee, path_friendly_name):
241        """ Assert that |path| is a symbolic link to |pointee|.
242
243        @param path string pathname to test.
244        @param pointee string pathname that |path| should point to.
245        @param path_friendly_name string user-parsable description of |path|.
246
247        """
248        self.assure_is_link(path, path_friendly_name)
249        self.assure(os.readlink(path) == pointee,
250                    '%s is a symbolic link to %s' %
251                    (path_friendly_name, pointee))
252
253
254    def assure_method_calls(self, expected_method_calls, assertion_name):
255        """ Assert that |expected_method_calls| were executed on mock_flimflam.
256
257        @param expected_method_calls list of string-tuple pairs of method
258            name + tuple of arguments.
259        @param assertion_name string name to assign to the assertion.
260
261        """
262        method_calls = self.mock_flimflam.get_method_calls()
263        if len(expected_method_calls) != len(method_calls):
264            self.assure(False, '%s: method call count does not match' %
265                        assertion_name)
266        for expected, actual in zip(expected_method_calls, method_calls):
267            self.assure(actual.method == expected[0],
268                        '%s: method %s matches expected %s' %
269                        (assertion_name, actual.method, expected[0]))
270            self.assure(actual.argument == expected[1],
271                        '%s: argument %s matches expected %s' %
272                        (assertion_name, actual.argument, expected[1]))
273
274
275    def create_file_with_contents(self, filename, contents):
276        """ Create a file named |filename| that contains |contents|.
277
278        @param filename string name of file.
279        @param contents string contents of file.
280
281        """
282        with open(filename, 'w') as f:
283            f.write(contents)
284
285
286    def touch(self, filename):
287        """ Create an empty file named |filename|.
288
289        @param filename string name of file.
290
291        """
292        self.create_file_with_contents(filename, '')
293
294
295    def create_new_shill_user_profile(self, contents):
296        """ Create a fake new user profile with |contents|.
297
298        @param contents string contents of the new user profile.
299
300        """
301        os.mkdir(self.new_shill_user_profile_dir)
302        self.create_file_with_contents(self.new_shill_user_profile, contents)
303
304
305    def create_old_shill_user_profile(self, contents):
306        """ Create a fake old-style user profile with |contents|.
307
308        @param contents string contents of the old user profile.
309
310        """
311        os.mkdir(self.old_shill_user_profile_dir)
312        self.create_file_with_contents(self.old_shill_user_profile, contents)
313
314
315    def file_contents(self, filename):
316        """ Returns the contents of |filename|.
317
318        @param filename string name of file to read.
319
320        """
321        with open(filename) as f:
322            return f.read()
323
324
325    def find_pid(self, process_name):
326        """ Returns the process id of |process_name|.
327
328        @param process_name string name of process to search for.
329
330        """
331        return utils.system_output('pgrep %s' % process_name,
332                                   ignore_status=True).split('\n')[0]
333
334
335    def get_commandline(self):
336        """ Returns the command line of the current shill executable. """
337        pid = self.find_pid('shill')
338        return file('/proc/%s/cmdline' % pid).read().split('\0')
339
340
341    def run_once(self):
342        """ Main test loop. """
343        try:
344            self.start_test()
345        except:
346            self.restart_system_processes()
347            raise
348
349        try:
350            self.run_tests([
351                self.test_start_shill,
352                self.test_start_logged_in])
353
354            # The tests above run a real instance of shill, whereas the tests
355            # below rely on a mock instance of shill.  We must take care not
356            # to run the mock at the same time as a real shill instance.
357            self.start_mock_flimflam()
358
359            self.run_tests([
360                self.test_login,
361                self.test_login_guest,
362                self.test_login_profile_exists,
363                self.test_login_old_shill_profile,
364                self.test_login_invalid_old_shill_profile,
365                self.test_login_ignore_old_shill_profile,
366                self.test_login_multi_profile,
367                self.test_logout])
368        finally:
369            # Stop any shill instances started during testing.
370            self.stop_shill()
371            self.end_test()
372
373
374    def run_tests(self, tests):
375        """ Executes each of the test subparts in sequence.
376
377        @param tests list of methods to run.
378
379        """
380        for test in tests:
381          self.test_name = test.__name__
382          test()
383          self.stop_shill()
384          self.erase_state()
385
386
387    def test_start_shill(self):
388        """ Test all created pathnames during shill startup.
389
390        Also ensure the push argument is not provided by default.
391
392        """
393        self.start_shill()
394        self.assure_is_dir('/run/shill', 'Shill run directory')
395        self.assure_is_dir('/var/lib/dhcpcd', 'dhcpcd lib directory')
396        self.assure_path_owner('/var/lib/dhcpcd', 'dhcp')
397        self.assure_path_group('/var/lib/dhcpcd', 'dhcp')
398        self.assure_is_dir('/run/dhcpcd', 'dhcpcd run directory')
399        self.assure_path_owner('/run/dhcpcd', 'dhcp')
400        self.assure_path_group('/run/dhcpcd', 'dhcp')
401        self.assure('--push=~chronos/shill' not in self.get_commandline(),
402                    'Shill command line does not contain push argument')
403
404
405    def test_start_logged_in(self):
406        """ Tests starting up shill while a user is already logged in.
407
408        The "--push" argument should not be added even though shill is started
409        while a user is logged in.
410
411        """
412        os.mkdir('/run/shill')
413        os.mkdir('/run/shill/user_profiles')
414        self.create_new_shill_user_profile('')
415        os.symlink(self.new_shill_user_profile_dir,
416                   '/run/shill/user_profiles/chronos')
417        self.touch('/run/state/logged-in')
418        self.start_shill()
419        self.assure('--push=~chronos/shill' not in self.get_commandline(),
420                    'Shill command line does not contain push argument')
421        os.unlink('/run/state/logged-in')
422
423
424    def test_login(self):
425        """ Test the login process.
426
427        Login should create a profile directory, then create and push
428        a user profile, given no previous state.
429
430        """
431        os.mkdir('/run/shill')
432        self.login()
433        self.assure(not os.path.exists(self.old_shill_user_profile),
434                    'Old shill user profile does not exist')
435        self.assure(not os.path.exists(self.new_shill_user_profile),
436                    'New shill user profile does not exist')
437        # The DBus "CreateProfile" method should have been handled
438        # by our mock_flimflam instance, so the profile directory
439        # should not have actually been created.
440        self.assure_is_dir(self.new_shill_user_profile_dir,
441                           'New shill user profile directory')
442        self.assure_is_dir('/run/shill/user_profiles',
443                           'Shill profile root')
444        self.assure_is_link_to('/run/shill/user_profiles/chronos',
445                               self.new_shill_user_profile_dir,
446                               'Shill profile link')
447        self.assure_is_dir(self.user_cryptohome_log_dir,
448                           'shill user log directory')
449        self.assure_is_link_to('/run/shill/log',
450                               self.user_cryptohome_log_dir,
451                               'Shill logs link')
452        self.assure_method_calls([[ 'CreateProfile', '~chronos/shill' ],
453                                  [ 'InsertUserProfile',
454                                    ('~chronos/shill', self.fake_user_hash) ]],
455                                 'CreateProfile and InsertUserProfile '
456                                 'are called')
457
458
459    def test_login_guest(self):
460        """ Tests the guest login process.
461
462        Login should create a temporary profile directory in /run,
463        instead of using one within the root directory for normal users.
464
465        """
466        os.mkdir('/run/shill')
467        self.login_guest()
468        self.assure(not os.path.exists(self.old_shill_user_profile),
469                    'Old shill user profile does not exist')
470        self.assure(not os.path.exists(self.new_shill_user_profile),
471                    'New shill user profile does not exist')
472        self.assure(not os.path.exists(self.new_shill_user_profile_dir),
473                    'New shill user profile directory')
474        self.assure_is_dir(self.guest_shill_user_profile_dir,
475                           'shill guest user profile directory')
476        self.assure_is_dir('/run/shill/user_profiles',
477                           'Shill profile root')
478        self.assure_is_link_to('/run/shill/user_profiles/chronos',
479                               self.guest_shill_user_profile_dir,
480                               'Shill profile link')
481        self.assure_is_dir(self.guest_shill_user_log_dir,
482                           'shill guest user log directory')
483        self.assure_is_link_to('/run/shill/log',
484                               self.guest_shill_user_log_dir,
485                               'Shill logs link')
486        self.assure_method_calls([[ 'CreateProfile', '~chronos/shill' ],
487                                  [ 'InsertUserProfile',
488                                    ('~chronos/shill', '') ]],
489                                 'CreateProfile and InsertUserProfile '
490                                 'are called')
491
492
493    def test_login_profile_exists(self):
494        """ Test logging in a user whose profile already exists.
495
496        Login script should only push (and not create) the user profile
497        if a user profile already exists.
498        """
499        os.mkdir('/run/shill')
500        os.mkdir(self.new_shill_user_profile_dir)
501        self.touch(self.new_shill_user_profile)
502        self.login()
503        self.assure_method_calls([[ 'InsertUserProfile',
504                                    ('~chronos/shill', self.fake_user_hash) ]],
505                                 'Only InsertUserProfile is called')
506
507
508    def test_login_old_shill_profile(self):
509        """ Test logging in a user with an old-style shill profile.
510
511        Login script should move an old shill user profile into place
512        if a new one does not exist.
513        """
514        os.mkdir('/run/shill')
515        self.create_old_shill_user_profile(self.magic_header)
516        self.login()
517        self.assure(not os.path.exists(self.old_shill_user_profile),
518                    'Old shill user profile no longer exists')
519        self.assure(not os.path.exists(self.old_shill_user_profile_dir),
520                    'Old shill user profile directory no longer exists')
521        self.assure_exists(self.new_shill_user_profile,
522                           'New shill profile')
523        self.assure(self.magic_header in
524                    self.file_contents(self.new_shill_user_profile),
525                    'Shill user profile contains our magic header')
526        self.assure_method_calls([[ 'InsertUserProfile',
527                                    ('~chronos/shill', self.fake_user_hash) ]],
528                                 'Only InsertUserProfile is called')
529
530
531    def make_symlink(self, path):
532        """ Create a symbolic link named |path|.
533
534        @param path string pathname of the symbolic link.
535
536        """
537        os.symlink('/etc/hosts', path)
538
539
540    def make_special_file(self, path):
541        """ Create a special file named |path|.
542
543        @param path string pathname of the special file.
544
545        """
546        os.mknod(path, stat.S_IFIFO)
547
548
549    def make_bad_owner(self, path):
550        """ Create a regular file with a strange ownership.
551
552        @param path string pathname of the file.
553
554        """
555        self.touch(path)
556        os.lchown(path, 1000, 1000)
557
558
559    def test_login_invalid_old_shill_profile(self):
560        """ Test logging in with an invalid old-style shill profile.
561
562        Login script should ignore non-regular files or files not owned
563        by the correct user.  The original file should be removed.
564
565        """
566        os.mkdir('/run/shill')
567        for file_creation_method in (self.make_symlink,
568                                     self.make_special_file,
569                                     os.mkdir,
570                                     self.make_bad_owner):
571            os.mkdir(self.old_shill_user_profile_dir)
572            file_creation_method(self.old_shill_user_profile)
573            self.login()
574            self.assure(not os.path.exists(self.old_shill_user_profile),
575                        'Old shill user profile no longer exists')
576            self.assure(not os.path.exists(self.old_shill_user_profile_dir),
577                        'Old shill user profile directory no longer exists')
578            self.assure(not os.path.exists(self.new_shill_user_profile),
579                        'New shill profile was not created')
580            self.assure_method_calls([[ 'CreateProfile', '~chronos/shill' ],
581                                      [ 'InsertUserProfile',
582                                        ('~chronos/shill',
583                                         self.fake_user_hash) ]],
584                                     'CreateProfile and InsertUserProfile '
585                                     'are called')
586            os.unlink('/run/shill/user_profiles/chronos')
587
588
589    def test_login_ignore_old_shill_profile(self):
590        """ Test logging in with both an old and new profile present.
591
592        Login script should ignore an old shill user profile if a new one
593        exists.
594
595        """
596        os.mkdir('/run/shill')
597        self.create_new_shill_user_profile('')
598        self.create_old_shill_user_profile(self.magic_header)
599        self.login()
600        self.assure(os.path.exists(self.old_shill_user_profile),
601                    'Old shill user profile still exists')
602        self.assure_exists(self.new_shill_user_profile,
603                           'New shill profile')
604        self.assure(self.magic_header not in
605                    self.file_contents(self.new_shill_user_profile),
606                    'Shill user profile does not contain our magic header')
607        self.assure_method_calls([[ 'InsertUserProfile',
608                                    ('~chronos/shill', self.fake_user_hash) ]],
609                                 'Only InsertUserProfile is called')
610
611
612    def test_login_multi_profile(self):
613        """ Test signalling shill about multiple logged-in users.
614
615        Login script should not create multiple profiles in parallel
616        if called more than once without an intervening logout.  Only
617        the initial user profile should be created.
618
619        """
620        os.mkdir('/run/shill')
621        self.create_new_shill_user_profile('')
622
623        # First logged-in user should create a profile (tested above).
624        self.login()
625
626        # Clear the mock method-call queue.
627        self.mock_flimflam.get_method_calls()
628
629        for attempt in range(5):
630            self.login()
631            self.assure_method_calls([], 'No more profiles are added to shill')
632            profile_links = os.listdir('/run/shill/user_profiles')
633            self.assure(len(profile_links) == 1, 'Only one profile exists')
634            self.assure(profile_links[0] == 'chronos',
635                        'The profile link is for the chronos user')
636            self.assure_is_link_to('/run/shill/log',
637                                   self.user_cryptohome_log_dir,
638                                   'Shill log link for chronos')
639
640
641    def test_logout(self):
642        """ Test the logout process. """
643        os.makedirs('/run/shill/user_profiles')
644        os.makedirs(self.guest_shill_user_profile_dir)
645        os.makedirs(self.guest_shill_user_log_dir)
646        self.touch('/run/state/logged-in')
647        self.logout()
648        self.assure(not os.path.exists('/run/shill/user_profiles'),
649                    'User profile directory was removed')
650        self.assure(not os.path.exists(self.guest_shill_user_profile_dir),
651                    'Guest user profile directory was removed')
652        self.assure(not os.path.exists(self.guest_shill_user_log_dir),
653                    'Guest user log directory was removed')
654        self.assure_method_calls([[ 'PopAllUserProfiles', '' ]],
655                                 'PopAllUserProfiles is called')
656