1import heapq
2import os
3import logging
4
5import common
6from autotest_lib.client.common_lib import error
7from autotest_lib.client.common_lib import global_config
8from autotest_lib.client.common_lib import utils
9from autotest_lib.scheduler import drone_task_queue
10from autotest_lib.scheduler import drone_utility
11from autotest_lib.scheduler import drones
12from autotest_lib.scheduler import scheduler_config
13from autotest_lib.scheduler import thread_lib
14
15try:
16    from chromite.lib import metrics
17except ImportError:
18    metrics = utils.metrics_mock
19
20
21# results on drones will be placed under the drone_installation_directory in a
22# directory with this name
23_DRONE_RESULTS_DIR_SUFFIX = 'results'
24
25WORKING_DIRECTORY = object() # see execute_command()
26
27
28AUTOSERV_PID_FILE = '.autoserv_execute'
29CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
30PARSER_PID_FILE = '.parser_execute'
31ARCHIVER_PID_FILE = '.archiver_execute'
32
33ALL_PIDFILE_NAMES = (AUTOSERV_PID_FILE, CRASHINFO_PID_FILE, PARSER_PID_FILE,
34                     ARCHIVER_PID_FILE)
35
36_THREADED_DRONE_MANAGER = global_config.global_config.get_config_value(
37        scheduler_config.CONFIG_SECTION, 'threaded_drone_manager',
38        type=bool, default=True)
39
40HOSTS_JOB_SUBDIR = 'hosts/'
41PARSE_LOG = '.parse.log'
42ENABLE_ARCHIVING =  global_config.global_config.get_config_value(
43        scheduler_config.CONFIG_SECTION, 'enable_archiving', type=bool)
44
45
46class DroneManagerError(Exception):
47    pass
48
49
50class CustomEquals(object):
51    def _id(self):
52        raise NotImplementedError
53
54
55    def __eq__(self, other):
56        if not isinstance(other, type(self)):
57            return NotImplemented
58        return self._id() == other._id()
59
60
61    def __ne__(self, other):
62        return not self == other
63
64
65    def __hash__(self):
66        return hash(self._id())
67
68
69class Process(CustomEquals):
70    def __init__(self, hostname, pid, ppid=None):
71        self.hostname = hostname
72        self.pid = pid
73        self.ppid = ppid
74
75    def _id(self):
76        return (self.hostname, self.pid)
77
78
79    def __str__(self):
80        return '%s/%s' % (self.hostname, self.pid)
81
82
83    def __repr__(self):
84        return super(Process, self).__repr__() + '<%s>' % self
85
86
87class PidfileId(CustomEquals):
88    def __init__(self, path):
89        self.path = path
90
91
92    def _id(self):
93        return self.path
94
95
96    def __str__(self):
97        return str(self.path)
98
99
100class _PidfileInfo(object):
101    age = 0
102    num_processes = None
103
104
105class PidfileContents(object):
106    process = None
107    exit_status = None
108    num_tests_failed = None
109
110    def is_invalid(self):
111        return False
112
113
114    def is_running(self):
115        return self.process and not self.exit_status
116
117
118class InvalidPidfile(object):
119    process = None
120    exit_status = None
121    num_tests_failed = None
122
123
124    def __init__(self, error):
125        self.error = error
126
127
128    def is_invalid(self):
129        return True
130
131
132    def is_running(self):
133        return False
134
135
136    def __str__(self):
137        return self.error
138
139
140class _DroneHeapWrapper(object):
141    """Wrapper to compare drones based on used_capacity().
142
143    These objects can be used to keep a heap of drones by capacity.
144    """
145    def __init__(self, drone):
146        self.drone = drone
147
148
149    def __cmp__(self, other):
150        assert isinstance(other, _DroneHeapWrapper)
151        return cmp(self.drone.used_capacity(), other.drone.used_capacity())
152
153
154class DroneManager(object):
155    """
156    This class acts as an interface from the scheduler to drones, whether it be
157    only a single "drone" for localhost or multiple remote drones.
158
159    All paths going into and out of this class are relative to the full results
160    directory, except for those returns by absolute_path().
161    """
162
163
164    # Minimum time to wait before next email
165    # about a drone hitting process limit is sent.
166    NOTIFY_INTERVAL = 60 * 60 * 24 # one day
167    _STATS_KEY = 'drone_manager'
168
169
170
171    def __init__(self):
172        # absolute path of base results dir
173        self._results_dir = None
174        # holds Process objects
175        self._process_set = set()
176        # holds the list of all processes running on all drones
177        self._all_processes = {}
178        # maps PidfileId to PidfileContents
179        self._pidfiles = {}
180        # same as _pidfiles
181        self._pidfiles_second_read = {}
182        # maps PidfileId to _PidfileInfo
183        self._registered_pidfile_info = {}
184        # used to generate unique temporary paths
185        self._temporary_path_counter = 0
186        # maps hostname to Drone object
187        self._drones = {}
188        self._results_drone = None
189        # maps results dir to dict mapping file path to contents
190        self._attached_files = {}
191        # heapq of _DroneHeapWrappers
192        self._drone_queue = []
193        # A threaded task queue used to refresh drones asynchronously.
194        if _THREADED_DRONE_MANAGER:
195            self._refresh_task_queue = thread_lib.ThreadedTaskQueue(
196                    name='%s.refresh_queue' % self._STATS_KEY)
197        else:
198            self._refresh_task_queue = drone_task_queue.DroneTaskQueue()
199
200
201    def initialize(self, base_results_dir, drone_hostnames,
202                   results_repository_hostname):
203        self._results_dir = base_results_dir
204
205        for hostname in drone_hostnames:
206            self._add_drone(hostname)
207
208        if not self._drones:
209            # all drones failed to initialize
210            raise DroneManagerError('No valid drones found')
211
212        self.refresh_drone_configs()
213
214        logging.info('Using results repository on %s',
215                     results_repository_hostname)
216        self._results_drone = drones.get_drone(results_repository_hostname)
217        results_installation_dir = global_config.global_config.get_config_value(
218                scheduler_config.CONFIG_SECTION,
219                'results_host_installation_directory', default=None)
220        if results_installation_dir:
221            self._results_drone.set_autotest_install_dir(
222                    results_installation_dir)
223        # don't initialize() the results drone - we don't want to clear out any
224        # directories and we don't need to kill any processes
225
226
227    def reinitialize_drones(self):
228        for drone in self.get_drones():
229            with metrics.SecondsTimer(
230                    'chromeos/autotest/drone_manager/'
231                    'reinitialize_drones_duration',
232                    fields={'drone': drone.hostname}):
233                drone.call('initialize', self._results_dir)
234
235
236    def shutdown(self):
237        for drone in self.get_drones():
238            drone.shutdown()
239
240
241    def _get_max_pidfile_refreshes(self):
242        """
243        Normally refresh() is called on every monitor_db.Dispatcher.tick().
244
245        @returns: The number of refresh() calls before we forget a pidfile.
246        """
247        pidfile_timeout = global_config.global_config.get_config_value(
248                scheduler_config.CONFIG_SECTION, 'max_pidfile_refreshes',
249                type=int, default=2000)
250        return pidfile_timeout
251
252
253    def _add_drone(self, hostname):
254        """
255        Add drone.
256
257        Catches AutoservRunError if the drone fails initialization and does not
258        add it to the list of usable drones.
259
260        @param hostname: Hostname of the drone we are trying to add.
261        """
262        logging.info('Adding drone %s' % hostname)
263        drone = drones.get_drone(hostname)
264        if drone:
265            try:
266                drone.call('initialize', self.absolute_path(''))
267            except error.AutoservRunError as e:
268                logging.error('Failed to initialize drone %s with error: %s',
269                              hostname, e)
270                return
271            self._drones[drone.hostname] = drone
272
273
274    def _remove_drone(self, hostname):
275        self._drones.pop(hostname, None)
276
277
278    def refresh_drone_configs(self):
279        """
280        Reread global config options for all drones.
281        """
282        # Import server_manager_utils is delayed rather than at the beginning of
283        # this module. The reason is that test_that imports drone_manager when
284        # importing autoserv_utils. The import is done before test_that setup
285        # django (test_that only setup django in setup_local_afe, since it's
286        # not needed when test_that runs the test in a lab duts through :lab:
287        # option. Therefore, if server_manager_utils is imported at the
288        # beginning of this module, test_that will fail since django is not
289        # setup yet.
290        from autotest_lib.site_utils import server_manager_utils
291        config = global_config.global_config
292        section = scheduler_config.CONFIG_SECTION
293        config.parse_config_file()
294        for hostname, drone in self._drones.iteritems():
295            if server_manager_utils.use_server_db():
296                server = server_manager_utils.get_servers(hostname=hostname)[0]
297                attributes = dict([(a.attribute, a.value)
298                                   for a in server.attributes.all()])
299                drone.enabled = (
300                        int(attributes.get('disabled', 0)) == 0)
301                drone.max_processes = int(
302                        attributes.get(
303                            'max_processes',
304                            scheduler_config.config.max_processes_per_drone))
305                allowed_users = attributes.get('users', None)
306            else:
307                disabled = config.get_config_value(
308                        section, '%s_disabled' % hostname, default='')
309                drone.enabled = not bool(disabled)
310                drone.max_processes = config.get_config_value(
311                        section, '%s_max_processes' % hostname, type=int,
312                        default=scheduler_config.config.max_processes_per_drone)
313
314                allowed_users = config.get_config_value(
315                        section, '%s_users' % hostname, default=None)
316            if allowed_users:
317                drone.allowed_users = set(allowed_users.split())
318            else:
319                drone.allowed_users = None
320            logging.info('Drone %s.max_processes: %s', hostname,
321                         drone.max_processes)
322            logging.info('Drone %s.enabled: %s', hostname, drone.enabled)
323            logging.info('Drone %s.allowed_users: %s', hostname,
324                         drone.allowed_users)
325
326        self._reorder_drone_queue() # max_processes may have changed
327        # Clear notification record about reaching max_processes limit.
328        self._notify_record = {}
329
330
331    def get_drones(self):
332        return self._drones.itervalues()
333
334
335    def cleanup_orphaned_containers(self):
336        """Queue cleanup_orphaned_containers call at each drone.
337        """
338        for drone in self._drones.values():
339            logging.info('Queue cleanup_orphaned_containers at %s',
340                         drone.hostname)
341            drone.queue_call('cleanup_orphaned_containers')
342
343
344    def _get_drone_for_process(self, process):
345        return self._drones[process.hostname]
346
347
348    def _get_drone_for_pidfile_id(self, pidfile_id):
349        pidfile_contents = self.get_pidfile_contents(pidfile_id)
350        if pidfile_contents.process is None:
351          raise DroneManagerError('Fail to get a drone due to empty pidfile')
352        return self._get_drone_for_process(pidfile_contents.process)
353
354
355    def get_drone_for_pidfile_id(self, pidfile_id):
356        """Public API for luciferlib.
357
358        @param pidfile_id: PidfileId instance.
359        """
360        return self._get_drone_for_pidfile_id(pidfile_id)
361
362
363    def _drop_old_pidfiles(self):
364        # use items() since the dict is modified in unregister_pidfile()
365        for pidfile_id, info in self._registered_pidfile_info.items():
366            if info.age > self._get_max_pidfile_refreshes():
367                logging.warning('dropping leaked pidfile %s', pidfile_id)
368                self.unregister_pidfile(pidfile_id)
369            else:
370                info.age += 1
371
372
373    def _reset(self):
374        self._process_set = set()
375        self._all_processes = {}
376        self._pidfiles = {}
377        self._pidfiles_second_read = {}
378        self._drone_queue = []
379
380
381    def _parse_pidfile(self, drone, raw_contents):
382        """Parse raw pidfile contents.
383
384        @param drone: The drone on which this pidfile was found.
385        @param raw_contents: The raw contents of a pidfile, eg:
386            "pid\nexit_staus\nnum_tests_failed\n".
387        """
388        contents = PidfileContents()
389        if not raw_contents:
390            return contents
391        lines = raw_contents.splitlines()
392        if len(lines) > 3:
393            return InvalidPidfile('Corrupt pid file (%d lines):\n%s' %
394                                  (len(lines), lines))
395        try:
396            pid = int(lines[0])
397            contents.process = Process(drone.hostname, pid)
398            # if len(lines) == 2, assume we caught Autoserv between writing
399            # exit_status and num_failed_tests, so just ignore it and wait for
400            # the next cycle
401            if len(lines) == 3:
402                contents.exit_status = int(lines[1])
403                contents.num_tests_failed = int(lines[2])
404        except ValueError, exc:
405            return InvalidPidfile('Corrupt pid file: ' + str(exc.args))
406
407        return contents
408
409
410    def _process_pidfiles(self, drone, pidfiles, store_in_dict):
411        for pidfile_path, contents in pidfiles.iteritems():
412            pidfile_id = PidfileId(pidfile_path)
413            contents = self._parse_pidfile(drone, contents)
414            store_in_dict[pidfile_id] = contents
415
416
417    def _add_process(self, drone, process_info):
418        process = Process(drone.hostname, int(process_info['pid']),
419                          int(process_info['ppid']))
420        self._process_set.add(process)
421
422
423    def _add_autoserv_process(self, drone, process_info):
424        assert process_info['comm'] == 'autoserv'
425        # only root autoserv processes have pgid == pid
426        if process_info['pgid'] != process_info['pid']:
427            return
428        self._add_process(drone, process_info)
429
430
431    def _enqueue_drone(self, drone):
432        heapq.heappush(self._drone_queue, _DroneHeapWrapper(drone))
433
434
435    def _reorder_drone_queue(self):
436        heapq.heapify(self._drone_queue)
437
438
439    def reorder_drone_queue(self):
440        """Reorder drone queue according to modified process counts.
441
442        This public API is exposed for luciferlib to wrap.
443        """
444        self._reorder_drone_queue()
445
446
447    def _compute_active_processes(self, drone):
448        drone.active_processes = 0
449        for pidfile_id, contents in self._pidfiles.iteritems():
450            is_running = contents.exit_status is None
451            on_this_drone = (contents.process
452                             and contents.process.hostname == drone.hostname)
453            if is_running and on_this_drone:
454                info = self._registered_pidfile_info[pidfile_id]
455                if info.num_processes is not None:
456                    drone.active_processes += info.num_processes
457
458        metrics.Gauge('chromeos/autotest/drone/active_processes').set(
459                drone.active_processes,
460                fields={'drone_hostname': drone.hostname})
461
462
463    def _check_drone_process_limit(self, drone):
464        """
465        Notify if the number of processes on |drone| is approaching limit.
466
467        @param drone: A Drone object.
468        """
469        try:
470            percent = float(drone.active_processes) / drone.max_processes
471        except ZeroDivisionError:
472            percent = 100
473        metrics.Float('chromeos/autotest/drone/active_process_percentage'
474                      ).set(percent, fields={'drone_hostname': drone.hostname})
475
476    def trigger_refresh(self):
477        """Triggers a drone manager refresh.
478
479        @raises DroneManagerError: If a drone has un-executed calls.
480            Since they will get clobbered when we queue refresh calls.
481        """
482        self._reset()
483        self._drop_old_pidfiles()
484        pidfile_paths = [pidfile_id.path
485                         for pidfile_id in self._registered_pidfile_info]
486        drones = list(self.get_drones())
487        for drone in drones:
488            calls = drone.get_calls()
489            if calls:
490                raise DroneManagerError('Drone %s has un-executed calls: %s '
491                                        'which might get corrupted through '
492                                        'this invocation' %
493                                        (drone, [str(call) for call in calls]))
494            drone.queue_call('refresh', pidfile_paths)
495        logging.info("Invoking drone refresh.")
496        with metrics.SecondsTimer(
497                'chromeos/autotest/drone_manager/trigger_refresh_duration'):
498            self._refresh_task_queue.execute(drones, wait=False)
499
500
501    def sync_refresh(self):
502        """Complete the drone refresh started by trigger_refresh.
503
504        Waits for all drone threads then refreshes internal datastructures
505        with drone process information.
506        """
507
508        # This gives us a dictionary like what follows:
509        # {drone: [{'pidfiles': (raw contents of pidfile paths),
510        #           'autoserv_processes': (autoserv process info from ps),
511        #           'all_processes': (all process info from ps),
512        #           'parse_processes': (parse process infor from ps),
513        #           'pidfile_second_read': (pidfile contents, again),}]
514        #   drone2: ...}
515        # The values of each drone are only a list because this adheres to the
516        # drone utility interface (each call is executed and its results are
517        # places in a list, but since we never couple the refresh calls with
518        # any other call, this list will always contain a single dict).
519        with metrics.SecondsTimer(
520                'chromeos/autotest/drone_manager/sync_refresh_duration'):
521            all_results = self._refresh_task_queue.get_results()
522        logging.info("Drones refreshed.")
523
524        # The loop below goes through and parses pidfile contents. Pidfiles
525        # are used to track autoserv execution, and will always contain < 3
526        # lines of the following: pid, exit code, number of tests. Each pidfile
527        # is identified by a PidfileId object, which contains a unique pidfile
528        # path (unique because it contains the job id) making it hashable.
529        # All pidfiles are stored in the drone managers _pidfiles dict as:
530        #   {pidfile_id: pidfile_contents(Process(drone, pid),
531        #                                 exit_code, num_tests_failed)}
532        # In handle agents, each agent knows its pidfile_id, and uses this
533        # to retrieve the refreshed contents of its pidfile via the
534        # PidfileRunMonitor (through its tick) before making decisions. If
535        # the agent notices that its process has exited, it unregisters the
536        # pidfile from the drone_managers._registered_pidfile_info dict
537        # through its epilog.
538        for drone, results_list in all_results.iteritems():
539            results = results_list[0]
540            drone_hostname = drone.hostname.replace('.', '_')
541
542            for process_info in results['all_processes']:
543                if process_info['comm'] == 'autoserv':
544                    self._add_autoserv_process(drone, process_info)
545                drone_pid = drone.hostname, int(process_info['pid'])
546                self._all_processes[drone_pid] = process_info
547
548            for process_info in results['parse_processes']:
549                self._add_process(drone, process_info)
550
551            self._process_pidfiles(drone, results['pidfiles'], self._pidfiles)
552            self._process_pidfiles(drone, results['pidfiles_second_read'],
553                                   self._pidfiles_second_read)
554
555            self._compute_active_processes(drone)
556            if drone.enabled:
557                self._enqueue_drone(drone)
558                self._check_drone_process_limit(drone)
559
560
561    def refresh(self):
562        """Refresh all drones."""
563        with metrics.SecondsTimer(
564                'chromeos/autotest/drone_manager/refresh_duration'):
565            self.trigger_refresh()
566            self.sync_refresh()
567
568
569    @metrics.SecondsTimerDecorator(
570        'chromeos/autotest/drone_manager/execute_actions_duration')
571    def execute_actions(self):
572        """
573        Called at the end of a scheduler cycle to execute all queued actions
574        on drones.
575        """
576        # Invoke calls queued on all drones since the last call to execute
577        # and wait for them to return.
578        if _THREADED_DRONE_MANAGER:
579            thread_lib.ThreadedTaskQueue(
580                    name='%s.execute_queue' % self._STATS_KEY).execute(
581                            self._drones.values())
582        else:
583            drone_task_queue.DroneTaskQueue().execute(self._drones.values())
584
585        try:
586            self._results_drone.execute_queued_calls()
587        except error.AutoservError:
588            m = 'chromeos/autotest/errors/results_repository_failed'
589            metrics.Counter(m).increment(
590                fields={'drone_hostname': self._results_drone.hostname})
591            self._results_drone.clear_call_queue()
592
593
594    def get_orphaned_autoserv_processes(self):
595        """
596        Returns a set of Process objects for orphaned processes only.
597        """
598        return set(process for process in self._process_set
599                   if process.ppid == 1)
600
601
602    def kill_process(self, process):
603        """
604        Kill the given process.
605        """
606        logging.info('killing %s', process)
607        drone = self._get_drone_for_process(process)
608        drone.queue_kill_process(process)
609
610
611    def _ensure_directory_exists(self, path):
612        if not os.path.exists(path):
613            os.makedirs(path)
614
615
616    def total_running_processes(self):
617        return sum(drone.active_processes for drone in self.get_drones())
618
619
620    def max_runnable_processes(self, username, drone_hostnames_allowed):
621        """
622        Return the maximum number of processes that can be run (in a single
623        execution) given the current load on drones.
624        @param username: login of user to run a process.  may be None.
625        @param drone_hostnames_allowed: list of drones that can be used. May be
626                                        None
627        """
628        usable_drone_wrappers = [wrapper for wrapper in self._drone_queue
629                                 if wrapper.drone.usable_by(username) and
630                                 (drone_hostnames_allowed is None or
631                                          wrapper.drone.hostname in
632                                                  drone_hostnames_allowed)]
633        if not usable_drone_wrappers:
634            # all drones disabled or inaccessible
635            return 0
636        runnable_processes = [
637                wrapper.drone.max_processes - wrapper.drone.active_processes
638                for wrapper in usable_drone_wrappers]
639        return max([0] + runnable_processes)
640
641
642    def _least_loaded_drone(self, drones):
643        return min(drones, key=lambda d: d.used_capacity())
644
645
646    def pick_drone_to_use(self, num_processes=1):
647        """Return a drone to use.
648
649        Various options can be passed to optimize drone selection.
650
651        num_processes is the number of processes the drone is intended
652        to run.
653
654        This public API is exposed for luciferlib to wrap.
655
656        Returns a drone instance (see drones.py).
657        """
658        return self._choose_drone_for_execution(
659                num_processes=num_processes,
660                username=None,  # Always allow all drones
661                drone_hostnames_allowed=None,  # Always allow all drones
662        )
663
664
665    def _choose_drone_for_execution(self, num_processes, username,
666                                    drone_hostnames_allowed):
667        """Choose a drone to execute command.
668
669        @param num_processes: Number of processes needed for execution.
670        @param username: Name of the user to execute the command.
671        @param drone_hostnames_allowed: A list of names of drone allowed.
672
673        @return: A drone object to be used for execution.
674        """
675        # cycle through drones is order of increasing used capacity until
676        # we find one that can handle these processes
677        checked_drones = []
678        usable_drones = []
679        drone_to_use = None
680        while self._drone_queue:
681            drone = heapq.heappop(self._drone_queue).drone
682            checked_drones.append(drone)
683            logging.info('Checking drone %s', drone.hostname)
684            if not drone.usable_by(username):
685                continue
686
687            drone_allowed = (drone_hostnames_allowed is None
688                             or drone.hostname in drone_hostnames_allowed)
689            if not drone_allowed:
690                logging.debug('Drone %s not allowed: ', drone.hostname)
691                continue
692
693            usable_drones.append(drone)
694
695            if drone.active_processes + num_processes <= drone.max_processes:
696                drone_to_use = drone
697                break
698            logging.info('Drone %s has %d active + %s requested > %s max',
699                         drone.hostname, drone.active_processes, num_processes,
700                         drone.max_processes)
701
702        if not drone_to_use and usable_drones:
703            # Drones are all over loaded, pick the one with least load.
704            drone_summary = ','.join('%s %s/%s' % (drone.hostname,
705                                                   drone.active_processes,
706                                                   drone.max_processes)
707                                     for drone in usable_drones)
708            logging.error('No drone has capacity to handle %d processes (%s) '
709                          'for user %s', num_processes, drone_summary, username)
710            drone_to_use = self._least_loaded_drone(usable_drones)
711
712        # refill _drone_queue
713        for drone in checked_drones:
714            self._enqueue_drone(drone)
715
716        return drone_to_use
717
718
719    def _substitute_working_directory_into_command(self, command,
720                                                   working_directory):
721        for i, item in enumerate(command):
722            if item is WORKING_DIRECTORY:
723                command[i] = working_directory
724
725
726    def execute_command(self, command, working_directory, pidfile_name,
727                        num_processes, log_file=None, paired_with_pidfile=None,
728                        username=None, drone_hostnames_allowed=None):
729        """
730        Execute the given command, taken as an argv list.
731
732        @param command: command to execute as a list.  if any item is
733                WORKING_DIRECTORY, the absolute path to the working directory
734                will be substituted for it.
735        @param working_directory: directory in which the pidfile will be written
736        @param pidfile_name: name of the pidfile this process will write
737        @param num_processes: number of processes to account for from this
738                execution
739        @param log_file (optional): path (in the results repository) to hold
740                command output.
741        @param paired_with_pidfile (optional): a PidfileId for an
742                already-executed process; the new process will execute on the
743                same drone as the previous process.
744        @param username (optional): login of the user responsible for this
745                process.
746        @param drone_hostnames_allowed (optional): hostnames of the drones that
747                                                   this command is allowed to
748                                                   execute on
749        """
750        abs_working_directory = self.absolute_path(working_directory)
751        if not log_file:
752            log_file = self.get_temporary_path('execute')
753        log_file = self.absolute_path(log_file)
754
755        self._substitute_working_directory_into_command(command,
756                                                        abs_working_directory)
757
758        if paired_with_pidfile:
759            drone = self._get_drone_for_pidfile_id(paired_with_pidfile)
760        else:
761            drone = self._choose_drone_for_execution(
762                    num_processes, username, drone_hostnames_allowed)
763
764        if not drone:
765            raise DroneManagerError('command failed; no drones available: %s'
766                                    % command)
767
768        logging.info("command = %s", command)
769        logging.info('log file = %s:%s', drone.hostname, log_file)
770        self._write_attached_files(working_directory, drone)
771        drone.queue_call('execute_command', command, abs_working_directory,
772                         log_file, pidfile_name)
773        drone.active_processes += num_processes
774        self._reorder_drone_queue()
775
776        pidfile_path = os.path.join(abs_working_directory, pidfile_name)
777        pidfile_id = PidfileId(pidfile_path)
778        self.register_pidfile(pidfile_id)
779        self._registered_pidfile_info[pidfile_id].num_processes = num_processes
780        return pidfile_id
781
782
783    def get_pidfile_id_from(self, execution_tag, pidfile_name):
784        path = os.path.join(self.absolute_path(execution_tag), pidfile_name)
785        return PidfileId(path)
786
787
788    def register_pidfile(self, pidfile_id):
789        """
790        Indicate that the DroneManager should look for the given pidfile when
791        refreshing.
792        """
793        if pidfile_id not in self._registered_pidfile_info:
794            logging.info('monitoring pidfile %s', pidfile_id)
795            self._registered_pidfile_info[pidfile_id] = _PidfileInfo()
796        self._reset_pidfile_age(pidfile_id)
797
798
799    def _reset_pidfile_age(self, pidfile_id):
800        if pidfile_id in self._registered_pidfile_info:
801            self._registered_pidfile_info[pidfile_id].age = 0
802
803
804    def unregister_pidfile(self, pidfile_id):
805        if pidfile_id in self._registered_pidfile_info:
806            logging.info('forgetting pidfile %s', pidfile_id)
807            del self._registered_pidfile_info[pidfile_id]
808
809
810    def declare_process_count(self, pidfile_id, num_processes):
811        self._registered_pidfile_info[pidfile_id].num_processes = num_processes
812
813
814    def get_pidfile_contents(self, pidfile_id, use_second_read=False):
815        """
816        Retrieve a PidfileContents object for the given pidfile_id.  If
817        use_second_read is True, use results that were read after the processes
818        were checked, instead of before.
819        """
820        self._reset_pidfile_age(pidfile_id)
821        if use_second_read:
822            pidfile_map = self._pidfiles_second_read
823        else:
824            pidfile_map = self._pidfiles
825        return pidfile_map.get(pidfile_id, PidfileContents())
826
827
828    def is_process_running(self, process):
829        """
830        Check if the given process is in the running process list.
831        """
832        if process in self._process_set:
833            return True
834
835        drone_pid = process.hostname, process.pid
836        if drone_pid in self._all_processes:
837            logging.error('Process %s found, but not an autoserv process. '
838                    'Is %s', process, self._all_processes[drone_pid])
839            return True
840
841        return False
842
843
844    def get_temporary_path(self, base_name):
845        """
846        Get a new temporary path guaranteed to be unique across all drones
847        for this scheduler execution.
848        """
849        self._temporary_path_counter += 1
850        return os.path.join(drone_utility._TEMPORARY_DIRECTORY,
851                            '%s.%s' % (base_name, self._temporary_path_counter))
852
853
854    def absolute_path(self, path, on_results_repository=False):
855        if on_results_repository:
856            base_dir = self._results_dir
857        else:
858            base_dir = os.path.join(drones.AUTOTEST_INSTALL_DIR,
859                                    _DRONE_RESULTS_DIR_SUFFIX)
860        return os.path.join(base_dir, path)
861
862
863    def _copy_results_helper(self, process, source_path, destination_path,
864                             to_results_repository=False):
865        logging.debug('_copy_results_helper. process: %s, source_path: %s, '
866                      'destination_path: %s, to_results_repository: %s',
867                      process, source_path, destination_path,
868                      to_results_repository)
869        full_source = self.absolute_path(source_path)
870        full_destination = self.absolute_path(
871                destination_path, on_results_repository=to_results_repository)
872        source_drone = self._get_drone_for_process(process)
873        if to_results_repository:
874            source_drone.send_file_to(self._results_drone, full_source,
875                                      full_destination, can_fail=True)
876        else:
877            source_drone.queue_call('copy_file_or_directory', full_source,
878                                    full_destination)
879
880
881    def copy_to_results_repository(self, process, source_path,
882                                   destination_path=None):
883        """
884        Copy results from the given process at source_path to destination_path
885        in the results repository.
886
887        This will only copy the results back for Special Agent Tasks (Cleanup,
888        Verify, Repair) that reside in the hosts/ subdirectory of results if
889        the copy_task_results_back flag has been set to True inside
890        global_config.ini
891
892        It will also only copy .parse.log files back to the scheduler if the
893        copy_parse_log_back flag in global_config.ini has been set to True.
894        """
895        if not ENABLE_ARCHIVING:
896            return
897        copy_task_results_back = global_config.global_config.get_config_value(
898                scheduler_config.CONFIG_SECTION, 'copy_task_results_back',
899                type=bool)
900        copy_parse_log_back = global_config.global_config.get_config_value(
901                scheduler_config.CONFIG_SECTION, 'copy_parse_log_back',
902                type=bool)
903        special_task = source_path.startswith(HOSTS_JOB_SUBDIR)
904        parse_log = source_path.endswith(PARSE_LOG)
905        if (copy_task_results_back or not special_task) and (
906                copy_parse_log_back or not parse_log):
907            if destination_path is None:
908                destination_path = source_path
909            self._copy_results_helper(process, source_path, destination_path,
910                                      to_results_repository=True)
911
912    def _copy_to_results_repository(self, process, source_path,
913                                   destination_path=None):
914        """
915        Copy results from the given process at source_path to destination_path
916        in the results repository, without special task handling.
917        """
918        if destination_path is None:
919            destination_path = source_path
920        self._copy_results_helper(process, source_path, destination_path,
921                                  to_results_repository=True)
922
923
924    def copy_results_on_drone(self, process, source_path, destination_path):
925        """
926        Copy a results directory from one place to another on the drone.
927        """
928        self._copy_results_helper(process, source_path, destination_path)
929
930
931    def _write_attached_files(self, results_dir, drone):
932        attached_files = self._attached_files.pop(results_dir, {})
933        for file_path, contents in attached_files.iteritems():
934            drone.queue_call('write_to_file', self.absolute_path(file_path),
935                             contents)
936
937
938    def attach_file_to_execution(self, results_dir, file_contents,
939                                 file_path=None):
940        """
941        When the process for the results directory is executed, the given file
942        contents will be placed in a file on the drone.  Returns the path at
943        which the file will be placed.
944        """
945        if not file_path:
946            file_path = self.get_temporary_path('attach')
947        files_for_execution = self._attached_files.setdefault(results_dir, {})
948        assert file_path not in files_for_execution
949        files_for_execution[file_path] = file_contents
950        return file_path
951
952
953    def write_lines_to_file(self, file_path, lines, paired_with_process=None):
954        """
955        Write the given lines (as a list of strings) to a file.  If
956        paired_with_process is given, the file will be written on the drone
957        running the given Process.  Otherwise, the file will be written to the
958        results repository.
959        """
960        file_contents = '\n'.join(lines) + '\n'
961        if paired_with_process:
962            drone = self._get_drone_for_process(paired_with_process)
963            on_results_repository = False
964        else:
965            drone = self._results_drone
966            on_results_repository = True
967        full_path = self.absolute_path(
968                file_path, on_results_repository=on_results_repository)
969        drone.queue_call('write_to_file', full_path, file_contents)
970
971
972_the_instance = None
973
974def instance():
975    if _the_instance is None:
976        _set_instance(DroneManager())
977    return _the_instance
978
979
980def _set_instance(instance): # usable for testing
981    global _the_instance
982    _the_instance = instance
983