1#!/usr/bin/python
2
3import cPickle
4import os, unittest
5import common
6from autotest_lib.client.bin import local_host
7from autotest_lib.client.common_lib import global_config
8from autotest_lib.client.common_lib import utils
9from autotest_lib.client.common_lib.test_utils import mock
10from autotest_lib.frontend import setup_django_lite_environment
11from autotest_lib.scheduler import drone_manager, drone_utility, drones
12from autotest_lib.scheduler import scheduler_config, drone_manager
13from autotest_lib.scheduler import thread_lib
14from autotest_lib.scheduler import pidfile_monitor
15from autotest_lib.server.hosts import ssh_host
16
17
18class MockDrone(drones._AbstractDrone):
19    def __init__(self, name, active_processes=0, max_processes=10,
20                 allowed_users=None):
21        super(MockDrone, self).__init__()
22        self.name = name
23        self.hostname = name
24        self.active_processes = active_processes
25        self.max_processes = max_processes
26        self.allowed_users = allowed_users
27        self._host = 'mock_drone'
28        # maps method names list of tuples containing method arguments
29        self._recorded_calls = {'queue_call': [],
30                                'send_file_to': []}
31
32
33    def queue_call(self, method, *args, **kwargs):
34        self._recorded_calls['queue_call'].append((method, args, kwargs))
35
36
37    def call(self, method, *args, **kwargs):
38        # don't bother differentiating between call() and queue_call()
39        return self.queue_call(method, *args, **kwargs)
40
41
42    def send_file_to(self, drone, source_path, destination_path,
43                     can_fail=False):
44        self._recorded_calls['send_file_to'].append(
45                (drone, source_path, destination_path))
46
47
48    # method for use by tests
49    def _check_for_recorded_call(self, method_name, arguments):
50        recorded_arg_list = self._recorded_calls[method_name]
51        was_called = arguments in recorded_arg_list
52        if not was_called:
53            print 'Recorded args:', recorded_arg_list
54            print 'Expected:', arguments
55        return was_called
56
57
58    def was_call_queued(self, method, *args, **kwargs):
59        return self._check_for_recorded_call('queue_call',
60                                             (method, args, kwargs))
61
62
63    def was_file_sent(self, drone, source_path, destination_path):
64        return self._check_for_recorded_call('send_file_to',
65                                             (drone, source_path,
66                                              destination_path))
67
68
69class DroneManager(unittest.TestCase):
70    _DRONE_INSTALL_DIR = '/drone/install/dir'
71    _DRONE_RESULTS_DIR = os.path.join(_DRONE_INSTALL_DIR, 'results')
72    _RESULTS_DIR = '/results/dir'
73    _SOURCE_PATH = 'source/path'
74    _DESTINATION_PATH = 'destination/path'
75    _WORKING_DIRECTORY = 'working/directory'
76    _USERNAME = 'my_user'
77
78    def setUp(self):
79        self.god = mock.mock_god()
80        self.god.stub_with(drones, 'AUTOTEST_INSTALL_DIR',
81                           self._DRONE_INSTALL_DIR)
82        self.manager = drone_manager.DroneManager()
83        self.god.stub_with(self.manager, '_results_dir', self._RESULTS_DIR)
84
85        # we don't want this to ever actually get called
86        self.god.stub_function(drones, 'get_drone')
87        # we don't want the DroneManager to go messing with global config
88        def do_nothing():
89            pass
90        self.god.stub_with(self.manager, 'refresh_drone_configs', do_nothing)
91
92        # set up some dummy drones
93        self.mock_drone = MockDrone('mock_drone')
94        self.manager._drones[self.mock_drone.name] = self.mock_drone
95        self.results_drone = MockDrone('results_drone', 0, 10)
96        self.manager._results_drone = self.results_drone
97
98        self.mock_drone_process = drone_manager.Process(self.mock_drone.name, 0)
99
100
101    def tearDown(self):
102        self.god.unstub_all()
103
104
105    def _test_choose_drone_for_execution_helper(self, processes_info_list,
106                                                requested_processes):
107        for index, process_info in enumerate(processes_info_list):
108            active_processes, max_processes = process_info
109            self.manager._enqueue_drone(
110                    MockDrone(index, active_processes, max_processes,
111                              allowed_users=None)
112                    )
113
114        return self.manager._choose_drone_for_execution(
115                requested_processes, self._USERNAME, None)
116
117
118    def test_choose_drone_for_execution(self):
119        drone = self._test_choose_drone_for_execution_helper([(1, 2), (0, 2)],
120                                                             1)
121        self.assertEquals(drone.name, 1)
122
123
124    def test_choose_drone_for_execution_some_full(self):
125        drone = self._test_choose_drone_for_execution_helper([(0, 1), (1, 3)],
126                                                             2)
127        self.assertEquals(drone.name, 1)
128
129
130    def test_choose_drone_for_execution_all_full(self):
131        drone = self._test_choose_drone_for_execution_helper([(2, 1), (3, 2)],
132                                                             1)
133        self.assertEquals(drone.name, 1)
134
135
136    def test_choose_drone_for_execution_all_full_same_percentage_capacity(self):
137        drone = self._test_choose_drone_for_execution_helper([(5, 3), (10, 6)],
138                                                             1)
139        self.assertEquals(drone.name, 1)
140
141
142    def test_user_restrictions(self):
143        # this drone is restricted to a different user
144        self.manager._enqueue_drone(MockDrone(1, max_processes=10,
145                                              allowed_users=['fakeuser']))
146        # this drone is allowed but has lower capacity
147        self.manager._enqueue_drone(MockDrone(2, max_processes=2,
148                                              allowed_users=[self._USERNAME]))
149
150        self.assertEquals(2,
151                          self.manager.max_runnable_processes(self._USERNAME,
152                                                              None))
153        drone = self.manager._choose_drone_for_execution(
154                1, username=self._USERNAME, drone_hostnames_allowed=None)
155        self.assertEquals(drone.name, 2)
156
157
158    def test_user_restrictions_with_full_drone(self):
159        # this drone is restricted to a different user
160        self.manager._enqueue_drone(MockDrone(1, max_processes=10,
161                                              allowed_users=['fakeuser']))
162        # this drone is allowed but is full
163        self.manager._enqueue_drone(MockDrone(2, active_processes=3,
164                                              max_processes=2,
165                                              allowed_users=[self._USERNAME]))
166
167        self.assertEquals(0,
168                          self.manager.max_runnable_processes(self._USERNAME,
169                                                              None))
170        drone = self.manager._choose_drone_for_execution(
171                1, username=self._USERNAME, drone_hostnames_allowed=None)
172        self.assertEquals(drone.name, 2)
173
174
175    def _setup_test_drone_restrictions(self, active_processes=0):
176        self.manager._enqueue_drone(MockDrone(
177                1, active_processes=active_processes, max_processes=10))
178        self.manager._enqueue_drone(MockDrone(
179                2, active_processes=active_processes, max_processes=5))
180        self.manager._enqueue_drone(MockDrone(
181                3, active_processes=active_processes, max_processes=2))
182
183
184    def test_drone_restrictions_allow_any(self):
185        self._setup_test_drone_restrictions()
186        self.assertEquals(10,
187                          self.manager.max_runnable_processes(self._USERNAME,
188                                                              None))
189        drone = self.manager._choose_drone_for_execution(
190                1, username=self._USERNAME, drone_hostnames_allowed=None)
191        self.assertEqual(drone.name, 1)
192
193
194    def test_drone_restrictions_under_capacity(self):
195        self._setup_test_drone_restrictions()
196        drone_hostnames_allowed = (2, 3)
197        self.assertEquals(
198                5, self.manager.max_runnable_processes(self._USERNAME,
199                                                       drone_hostnames_allowed))
200        drone = self.manager._choose_drone_for_execution(
201                1, username=self._USERNAME,
202                drone_hostnames_allowed=drone_hostnames_allowed)
203
204        self.assertEqual(drone.name, 2)
205
206
207    def test_drone_restrictions_over_capacity(self):
208        self._setup_test_drone_restrictions(active_processes=6)
209        drone_hostnames_allowed = (2, 3)
210        self.assertEquals(
211                0, self.manager.max_runnable_processes(self._USERNAME,
212                                                       drone_hostnames_allowed))
213        drone = self.manager._choose_drone_for_execution(
214                7, username=self._USERNAME,
215                drone_hostnames_allowed=drone_hostnames_allowed)
216        self.assertEqual(drone.name, 2)
217
218
219    def test_drone_restrictions_allow_none(self):
220        self._setup_test_drone_restrictions()
221        drone_hostnames_allowed = ()
222        self.assertEquals(
223                0, self.manager.max_runnable_processes(self._USERNAME,
224                                                       drone_hostnames_allowed))
225        drone = self.manager._choose_drone_for_execution(
226                1, username=self._USERNAME,
227                drone_hostnames_allowed=drone_hostnames_allowed)
228        self.assertEqual(drone, None)
229
230
231    def test_initialize(self):
232        results_hostname = 'results_repo'
233        results_install_dir = '/results/install'
234        global_config.global_config.override_config_value(
235                scheduler_config.CONFIG_SECTION,
236                'results_host_installation_directory', results_install_dir)
237
238        (drones.get_drone.expect_call(self.mock_drone.name)
239         .and_return(self.mock_drone))
240
241        results_drone = MockDrone('results_drone')
242        self.god.stub_function(results_drone, 'set_autotest_install_dir')
243        drones.get_drone.expect_call(results_hostname).and_return(results_drone)
244        results_drone.set_autotest_install_dir.expect_call(results_install_dir)
245
246        self.manager.initialize(base_results_dir=self._RESULTS_DIR,
247                                drone_hostnames=[self.mock_drone.name],
248                                results_repository_hostname=results_hostname)
249
250        self.assert_(self.mock_drone.was_call_queued(
251                'initialize', self._DRONE_RESULTS_DIR + '/'))
252        self.god.check_playback()
253
254
255    def test_execute_command(self):
256        self.manager._enqueue_drone(self.mock_drone)
257
258        pidfile_name = 'my_pidfile'
259        log_file = 'log_file'
260
261        pidfile_id = self.manager.execute_command(
262                command=['test', drone_manager.WORKING_DIRECTORY],
263                working_directory=self._WORKING_DIRECTORY,
264                pidfile_name=pidfile_name,
265                num_processes=1,
266                log_file=log_file)
267
268        full_working_directory = os.path.join(self._DRONE_RESULTS_DIR,
269                                              self._WORKING_DIRECTORY)
270        self.assertEquals(pidfile_id.path,
271                          os.path.join(full_working_directory, pidfile_name))
272        self.assert_(self.mock_drone.was_call_queued(
273                'execute_command', ['test', full_working_directory],
274                full_working_directory,
275                os.path.join(self._DRONE_RESULTS_DIR, log_file), pidfile_name))
276
277
278    def test_attach_file_to_execution(self):
279        self.manager._enqueue_drone(self.mock_drone)
280
281        contents = 'my\ncontents'
282        attached_path = self.manager.attach_file_to_execution(
283                self._WORKING_DIRECTORY, contents)
284        self.manager.execute_command(command=['test'],
285                                     working_directory=self._WORKING_DIRECTORY,
286                                     pidfile_name='mypidfile',
287                                     num_processes=1,
288                                     drone_hostnames_allowed=None)
289
290        self.assert_(self.mock_drone.was_call_queued(
291                'write_to_file',
292                os.path.join(self._DRONE_RESULTS_DIR, attached_path),
293                contents))
294
295
296    def test_copy_results_on_drone(self):
297        self.manager.copy_results_on_drone(self.mock_drone_process,
298                                           self._SOURCE_PATH,
299                                           self._DESTINATION_PATH)
300        self.assert_(self.mock_drone.was_call_queued(
301                'copy_file_or_directory',
302                os.path.join(self._DRONE_RESULTS_DIR, self._SOURCE_PATH),
303                os.path.join(self._DRONE_RESULTS_DIR, self._DESTINATION_PATH)))
304
305
306    def test_copy_to_results_repository(self):
307        drone_manager.ENABLE_ARCHIVING = True
308        self.manager._copy_to_results_repository(self.mock_drone_process,
309                                                 self._SOURCE_PATH)
310        self.assert_(self.mock_drone.was_file_sent(
311                self.results_drone,
312                os.path.join(self._DRONE_RESULTS_DIR, self._SOURCE_PATH),
313                os.path.join(self._RESULTS_DIR, self._SOURCE_PATH)))
314
315
316    def test_write_lines_to_file(self):
317        file_path = 'file/path'
318        lines = ['line1', 'line2']
319        written_data = 'line1\nline2\n'
320
321        # write to results repository
322        self.manager.write_lines_to_file(file_path, lines)
323        self.assert_(self.results_drone.was_call_queued(
324                'write_to_file', os.path.join(self._RESULTS_DIR, file_path),
325                written_data))
326
327        # write to a drone
328        self.manager.write_lines_to_file(
329                file_path, lines, paired_with_process=self.mock_drone_process)
330        self.assert_(self.mock_drone.was_call_queued(
331                'write_to_file',
332                os.path.join(self._DRONE_RESULTS_DIR, file_path), written_data))
333
334
335    def test_pidfile_expiration(self):
336        self.god.stub_with(self.manager, '_get_max_pidfile_refreshes',
337                           lambda: 0)
338        pidfile_id = self.manager.get_pidfile_id_from('tag', 'name')
339        self.manager.register_pidfile(pidfile_id)
340        self.manager._drop_old_pidfiles()
341        self.manager._drop_old_pidfiles()
342        self.assertFalse(self.manager._registered_pidfile_info)
343
344
345class ThreadedDroneTest(unittest.TestCase):
346    _DRONE_INSTALL_DIR = '/drone/install/dir'
347    _RESULTS_DIR = '/results/dir'
348    _DRONE_CLASS = drones._RemoteDrone
349    _DRONE_HOST = ssh_host.SSHHost
350
351
352    def create_drone(self, drone_hostname, mock_hostname,
353                     timestamp_remote_calls=False):
354        """Create and initialize a Remote Drone.
355
356        @return: A remote drone instance.
357        """
358        mock_host = self.god.create_mock_class(self._DRONE_HOST, mock_hostname)
359        self.god.stub_function(drones.drone_utility, 'create_host')
360        drones.drone_utility.create_host.expect_call(drone_hostname).and_return(
361                mock_host)
362        mock_host.is_up.expect_call().and_return(True)
363        return self._DRONE_CLASS(drone_hostname,
364                                 timestamp_remote_calls=timestamp_remote_calls)
365
366
367    def create_fake_pidfile_info(self, tag='tag', name='name'):
368        pidfile_id = self.manager.get_pidfile_id_from(tag, name)
369        self.manager.register_pidfile(pidfile_id)
370        return self.manager._registered_pidfile_info
371
372
373    def setUp(self):
374        self.god = mock.mock_god()
375        self.god.stub_with(drones, 'AUTOTEST_INSTALL_DIR',
376                           self._DRONE_INSTALL_DIR)
377        self.manager = drone_manager.DroneManager()
378        self.god.stub_with(self.manager, '_results_dir', self._RESULTS_DIR)
379
380        # we don't want this to ever actually get called
381        self.god.stub_function(drones, 'get_drone')
382        # we don't want the DroneManager to go messing with global config
383        def do_nothing():
384            pass
385        self.god.stub_with(self.manager, 'refresh_drone_configs', do_nothing)
386
387        self.results_drone = MockDrone('results_drone', 0, 10)
388        self.manager._results_drone = self.results_drone
389        self.drone_utility_path = 'mock-drone-utility-path'
390        self.mock_return = {'results': ['mock results'],
391                            'warnings': []}
392
393
394    def tearDown(self):
395        self.god.unstub_all()
396
397    def test_trigger_refresh(self):
398        """Test drone manager trigger refresh."""
399        self.god.stub_with(self._DRONE_CLASS, '_drone_utility_path',
400                           self.drone_utility_path)
401        mock_drone = self.create_drone('fakedrone1', 'fakehost1')
402        self.manager._drones[mock_drone.hostname] = mock_drone
403
404        # Create some fake pidfiles and confirm that a refresh call is
405        # executed on each drone host, with the same pidfile paths. Then
406        # check that each drone gets a key in the returned results dictionary.
407        for i in range(0, 1):
408            pidfile_info = self.create_fake_pidfile_info(
409                    'tag%s' % i, 'name%s' %i)
410        pidfile_paths = [pidfile.path for pidfile in pidfile_info.keys()]
411        refresh_call = drone_utility.call('refresh', pidfile_paths)
412        expected_results = {}
413        mock_result = utils.CmdResult(
414                stdout=cPickle.dumps(self.mock_return))
415        for drone in self.manager.get_drones():
416            drone._host.run.expect_call(
417                    'python %s' % self.drone_utility_path,
418                    stdin=cPickle.dumps([refresh_call]), stdout_tee=None,
419                    connect_timeout=mock.is_instance_comparator(int)
420                ).and_return(mock_result)
421            expected_results[drone] = self.mock_return['results']
422        self.manager.trigger_refresh()
423        self.assertTrue(self.manager._refresh_task_queue.get_results() ==
424                        expected_results)
425        self.god.check_playback()
426
427
428    def test_sync_refresh(self):
429        """Test drone manager sync refresh."""
430
431        mock_drone = self.create_drone('fakedrone1', 'fakehost1')
432        self.manager._drones[mock_drone.hostname] = mock_drone
433
434        # Insert some drone_utility results into the results queue, then
435        # check that get_results returns it in the right format, and that
436        # the rest of sync_refresh populates the right datastructures for
437        # correct handling of agents. Also confirm that this method of
438        # syncing is sufficient for the monitor to pick up the exit status
439        # of the process in the same way it would in handle_agents.
440        pidfile_path = 'results/hosts/host_id/job_id-name/.autoserv_execute'
441        pidfiles = {pidfile_path: '123\n12\n0\n'}
442        drone_utility_results = {
443                'pidfiles': pidfiles,
444                'autoserv_processes':{},
445                'all_processes':{},
446                'parse_processes':{},
447                'pidfiles_second_read':pidfiles,
448        }
449        # Our manager instance isn't the drone manager singletone that the
450        # pidfile_monitor will use by default, becuase setUp doesn't call
451        # drone_manager.instance().
452        self.god.stub_with(drone_manager, '_the_instance', self.manager)
453        monitor = pidfile_monitor.PidfileRunMonitor()
454        monitor.pidfile_id = drone_manager.PidfileId(pidfile_path)
455        self.manager.register_pidfile(monitor.pidfile_id)
456        self.assertTrue(monitor._state.exit_status == None)
457
458        self.manager._refresh_task_queue.results_queue.put(
459                thread_lib.ThreadedTaskQueue.result(
460                    mock_drone, [drone_utility_results]))
461        self.manager.sync_refresh()
462        pidfiles = self.manager._pidfiles
463        pidfile_id = pidfiles.keys()[0]
464        pidfile_contents = pidfiles[pidfile_id]
465
466        self.assertTrue(
467                pidfile_id.path == pidfile_path and
468                pidfile_contents.process.pid == 123 and
469                pidfile_contents.process.hostname ==
470                        mock_drone.hostname and
471                pidfile_contents.exit_status == 12 and
472                pidfile_contents.num_tests_failed == 0)
473        self.assertTrue(monitor.exit_code() == 12)
474        self.god.check_playback()
475
476
477class ThreadedLocalhostDroneTest(ThreadedDroneTest):
478    _DRONE_CLASS = drones._LocalDrone
479    _DRONE_HOST = local_host.LocalHost
480
481
482    def create_drone(self, drone_hostname, mock_hostname,
483                     timestamp_remote_calls=False):
484        """Create and initialize a Remote Drone.
485
486        @return: A remote drone instance.
487        """
488        mock_host = self.god.create_mock_class(self._DRONE_HOST, mock_hostname)
489        self.god.stub_function(drones.drone_utility, 'create_host')
490        local_drone = self._DRONE_CLASS(
491                timestamp_remote_calls=timestamp_remote_calls)
492        self.god.stub_with(local_drone, '_host', mock_host)
493        return local_drone
494
495
496if __name__ == '__main__':
497    unittest.main()
498