1import heapq
2import os
3import logging
4
5import common
6from autotest_lib.client.common_lib import error
7from autotest_lib.client.common_lib import global_config
8from autotest_lib.client.common_lib import utils
9from autotest_lib.scheduler import drones
10from autotest_lib.scheduler import drone_utility
11from autotest_lib.scheduler import drone_task_queue
12from autotest_lib.scheduler import scheduler_config
13from autotest_lib.scheduler import thread_lib
14
15try:
16    from chromite.lib import metrics
17except ImportError:
18    metrics = utils.metrics_mock
19
20
21# results on drones will be placed under the drone_installation_directory in a
22# directory with this name
23_DRONE_RESULTS_DIR_SUFFIX = 'results'
24
25WORKING_DIRECTORY = object() # see execute_command()
26
27
28AUTOSERV_PID_FILE = '.autoserv_execute'
29CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
30PARSER_PID_FILE = '.parser_execute'
31ARCHIVER_PID_FILE = '.archiver_execute'
32
33ALL_PIDFILE_NAMES = (AUTOSERV_PID_FILE, CRASHINFO_PID_FILE, PARSER_PID_FILE,
34                     ARCHIVER_PID_FILE)
35
36_THREADED_DRONE_MANAGER = global_config.global_config.get_config_value(
37        scheduler_config.CONFIG_SECTION, 'threaded_drone_manager',
38        type=bool, default=True)
39
40
41class DroneManagerError(Exception):
42    pass
43
44
45class CustomEquals(object):
46    def _id(self):
47        raise NotImplementedError
48
49
50    def __eq__(self, other):
51        if not isinstance(other, type(self)):
52            return NotImplemented
53        return self._id() == other._id()
54
55
56    def __ne__(self, other):
57        return not self == other
58
59
60    def __hash__(self):
61        return hash(self._id())
62
63
64class Process(CustomEquals):
65    def __init__(self, hostname, pid, ppid=None):
66        self.hostname = hostname
67        self.pid = pid
68        self.ppid = ppid
69
70    def _id(self):
71        return (self.hostname, self.pid)
72
73
74    def __str__(self):
75        return '%s/%s' % (self.hostname, self.pid)
76
77
78    def __repr__(self):
79        return super(Process, self).__repr__() + '<%s>' % self
80
81
82class PidfileId(CustomEquals):
83    def __init__(self, path):
84        self.path = path
85
86
87    def _id(self):
88        return self.path
89
90
91    def __str__(self):
92        return str(self.path)
93
94
95class _PidfileInfo(object):
96    age = 0
97    num_processes = None
98
99
100class PidfileContents(object):
101    process = None
102    exit_status = None
103    num_tests_failed = None
104
105    def is_invalid(self):
106        return False
107
108
109    def is_running(self):
110        return self.process and not self.exit_status
111
112
113class InvalidPidfile(object):
114    process = None
115    exit_status = None
116    num_tests_failed = None
117
118
119    def __init__(self, error):
120        self.error = error
121
122
123    def is_invalid(self):
124        return True
125
126
127    def is_running(self):
128        return False
129
130
131    def __str__(self):
132        return self.error
133
134
135class _DroneHeapWrapper(object):
136    """Wrapper to compare drones based on used_capacity().
137
138    These objects can be used to keep a heap of drones by capacity.
139    """
140    def __init__(self, drone):
141        self.drone = drone
142
143
144    def __cmp__(self, other):
145        assert isinstance(other, _DroneHeapWrapper)
146        return cmp(self.drone.used_capacity(), other.drone.used_capacity())
147
148
149class BaseDroneManager(object):
150    """
151    This class acts as an interface from the scheduler to drones, whether it be
152    only a single "drone" for localhost or multiple remote drones.
153
154    All paths going into and out of this class are relative to the full results
155    directory, except for those returns by absolute_path().
156    """
157
158
159    # Minimum time to wait before next email
160    # about a drone hitting process limit is sent.
161    NOTIFY_INTERVAL = 60 * 60 * 24 # one day
162    _STATS_KEY = 'drone_manager'
163
164    _ACTIVE_PROCESS_GAUGE = metrics.Gauge(
165        'chromeos/autotest/drone/active_processes')
166
167
168    def __init__(self):
169        # absolute path of base results dir
170        self._results_dir = None
171        # holds Process objects
172        self._process_set = set()
173        # holds the list of all processes running on all drones
174        self._all_processes = {}
175        # maps PidfileId to PidfileContents
176        self._pidfiles = {}
177        # same as _pidfiles
178        self._pidfiles_second_read = {}
179        # maps PidfileId to _PidfileInfo
180        self._registered_pidfile_info = {}
181        # used to generate unique temporary paths
182        self._temporary_path_counter = 0
183        # maps hostname to Drone object
184        self._drones = {}
185        self._results_drone = None
186        # maps results dir to dict mapping file path to contents
187        self._attached_files = {}
188        # heapq of _DroneHeapWrappers
189        self._drone_queue = []
190        # A threaded task queue used to refresh drones asynchronously.
191        if _THREADED_DRONE_MANAGER:
192            self._refresh_task_queue = thread_lib.ThreadedTaskQueue(
193                    name='%s.refresh_queue' % self._STATS_KEY)
194        else:
195            self._refresh_task_queue = drone_task_queue.DroneTaskQueue()
196
197
198    def initialize(self, base_results_dir, drone_hostnames,
199                   results_repository_hostname):
200        self._results_dir = base_results_dir
201
202        for hostname in drone_hostnames:
203            self._add_drone(hostname)
204
205        if not self._drones:
206            # all drones failed to initialize
207            raise DroneManagerError('No valid drones found')
208
209        self.refresh_drone_configs()
210
211        logging.info('Using results repository on %s',
212                     results_repository_hostname)
213        self._results_drone = drones.get_drone(results_repository_hostname)
214        results_installation_dir = global_config.global_config.get_config_value(
215                scheduler_config.CONFIG_SECTION,
216                'results_host_installation_directory', default=None)
217        if results_installation_dir:
218            self._results_drone.set_autotest_install_dir(
219                    results_installation_dir)
220        # don't initialize() the results drone - we don't want to clear out any
221        # directories and we don't need to kill any processes
222
223
224    def reinitialize_drones(self):
225        for drone in self.get_drones():
226            with metrics.SecondsTimer(
227                    'chromeos/autotest/drone_manager/'
228                    'reinitialize_drones_duration',
229                    fields={'drone': drone.hostname}):
230                drone.call('initialize', self._results_dir)
231
232
233    def shutdown(self):
234        for drone in self.get_drones():
235            drone.shutdown()
236
237
238    def _get_max_pidfile_refreshes(self):
239        """
240        Normally refresh() is called on every monitor_db.Dispatcher.tick().
241
242        @returns: The number of refresh() calls before we forget a pidfile.
243        """
244        pidfile_timeout = global_config.global_config.get_config_value(
245                scheduler_config.CONFIG_SECTION, 'max_pidfile_refreshes',
246                type=int, default=2000)
247        return pidfile_timeout
248
249
250    def _add_drone(self, hostname):
251        logging.info('Adding drone %s', hostname)
252        drone = drones.get_drone(hostname)
253        if drone:
254            self._drones[drone.hostname] = drone
255            drone.call('initialize', self.absolute_path(''))
256
257
258    def _remove_drone(self, hostname):
259        self._drones.pop(hostname, None)
260
261
262    def refresh_drone_configs(self):
263        """
264        Reread global config options for all drones.
265        """
266        # Import server_manager_utils is delayed rather than at the beginning of
267        # this module. The reason is that test_that imports drone_manager when
268        # importing autoserv_utils. The import is done before test_that setup
269        # django (test_that only setup django in setup_local_afe, since it's
270        # not needed when test_that runs the test in a lab duts through :lab:
271        # option. Therefore, if server_manager_utils is imported at the
272        # beginning of this module, test_that will fail since django is not
273        # setup yet.
274        from autotest_lib.site_utils import server_manager_utils
275        config = global_config.global_config
276        section = scheduler_config.CONFIG_SECTION
277        config.parse_config_file()
278        for hostname, drone in self._drones.iteritems():
279            if server_manager_utils.use_server_db():
280                server = server_manager_utils.get_servers(hostname=hostname)[0]
281                attributes = dict([(a.attribute, a.value)
282                                   for a in server.attributes.all()])
283                drone.enabled = (
284                        int(attributes.get('disabled', 0)) == 0)
285                drone.max_processes = int(
286                        attributes.get(
287                            'max_processes',
288                            scheduler_config.config.max_processes_per_drone))
289                allowed_users = attributes.get('users', None)
290            else:
291                disabled = config.get_config_value(
292                        section, '%s_disabled' % hostname, default='')
293                drone.enabled = not bool(disabled)
294                drone.max_processes = config.get_config_value(
295                        section, '%s_max_processes' % hostname, type=int,
296                        default=scheduler_config.config.max_processes_per_drone)
297
298                allowed_users = config.get_config_value(
299                        section, '%s_users' % hostname, default=None)
300            if allowed_users:
301                drone.allowed_users = set(allowed_users.split())
302            else:
303                drone.allowed_users = None
304            logging.info('Drone %s.max_processes: %s', hostname,
305                         drone.max_processes)
306            logging.info('Drone %s.enabled: %s', hostname, drone.enabled)
307            logging.info('Drone %s.allowed_users: %s', hostname,
308                         drone.allowed_users)
309            logging.info('Drone %s.support_ssp: %s', hostname,
310                         drone.support_ssp)
311
312        self._reorder_drone_queue() # max_processes may have changed
313        # Clear notification record about reaching max_processes limit.
314        self._notify_record = {}
315
316
317    def get_drones(self):
318        return self._drones.itervalues()
319
320
321    def cleanup_orphaned_containers(self):
322        """Queue cleanup_orphaned_containers call at each drone.
323        """
324        for drone in self._drones.values():
325            logging.info('Queue cleanup_orphaned_containers at %s',
326                         drone.hostname)
327            drone.queue_call('cleanup_orphaned_containers')
328
329
330    def _get_drone_for_process(self, process):
331        return self._drones[process.hostname]
332
333
334    def _get_drone_for_pidfile_id(self, pidfile_id):
335        pidfile_contents = self.get_pidfile_contents(pidfile_id)
336        assert pidfile_contents.process is not None
337        return self._get_drone_for_process(pidfile_contents.process)
338
339
340    def _drop_old_pidfiles(self):
341        # use items() since the dict is modified in unregister_pidfile()
342        for pidfile_id, info in self._registered_pidfile_info.items():
343            if info.age > self._get_max_pidfile_refreshes():
344                logging.warning('dropping leaked pidfile %s', pidfile_id)
345                self.unregister_pidfile(pidfile_id)
346            else:
347                info.age += 1
348
349
350    def _reset(self):
351        self._process_set = set()
352        self._all_processes = {}
353        self._pidfiles = {}
354        self._pidfiles_second_read = {}
355        self._drone_queue = []
356
357
358    def _parse_pidfile(self, drone, raw_contents):
359        """Parse raw pidfile contents.
360
361        @param drone: The drone on which this pidfile was found.
362        @param raw_contents: The raw contents of a pidfile, eg:
363            "pid\nexit_staus\nnum_tests_failed\n".
364        """
365        contents = PidfileContents()
366        if not raw_contents:
367            return contents
368        lines = raw_contents.splitlines()
369        if len(lines) > 3:
370            return InvalidPidfile('Corrupt pid file (%d lines):\n%s' %
371                                  (len(lines), lines))
372        try:
373            pid = int(lines[0])
374            contents.process = Process(drone.hostname, pid)
375            # if len(lines) == 2, assume we caught Autoserv between writing
376            # exit_status and num_failed_tests, so just ignore it and wait for
377            # the next cycle
378            if len(lines) == 3:
379                contents.exit_status = int(lines[1])
380                contents.num_tests_failed = int(lines[2])
381        except ValueError, exc:
382            return InvalidPidfile('Corrupt pid file: ' + str(exc.args))
383
384        return contents
385
386
387    def _process_pidfiles(self, drone, pidfiles, store_in_dict):
388        for pidfile_path, contents in pidfiles.iteritems():
389            pidfile_id = PidfileId(pidfile_path)
390            contents = self._parse_pidfile(drone, contents)
391            store_in_dict[pidfile_id] = contents
392
393
394    def _add_process(self, drone, process_info):
395        process = Process(drone.hostname, int(process_info['pid']),
396                          int(process_info['ppid']))
397        self._process_set.add(process)
398
399
400    def _add_autoserv_process(self, drone, process_info):
401        assert process_info['comm'] == 'autoserv'
402        # only root autoserv processes have pgid == pid
403        if process_info['pgid'] != process_info['pid']:
404            return
405        self._add_process(drone, process_info)
406
407
408    def _enqueue_drone(self, drone):
409        heapq.heappush(self._drone_queue, _DroneHeapWrapper(drone))
410
411
412    def _reorder_drone_queue(self):
413        heapq.heapify(self._drone_queue)
414
415
416    def _compute_active_processes(self, drone):
417        drone.active_processes = 0
418        for pidfile_id, contents in self._pidfiles.iteritems():
419            is_running = contents.exit_status is None
420            on_this_drone = (contents.process
421                             and contents.process.hostname == drone.hostname)
422            if is_running and on_this_drone:
423                info = self._registered_pidfile_info[pidfile_id]
424                if info.num_processes is not None:
425                    drone.active_processes += info.num_processes
426        self._ACTIVE_PROCESS_GAUGE.set(
427                drone.active_processes,
428                fields={'drone_hostname': drone.hostname})
429
430
431    def _check_drone_process_limit(self, drone):
432        """
433        Notify if the number of processes on |drone| is approaching limit.
434
435        @param drone: A Drone object.
436        """
437        try:
438            percent = float(drone.active_processes) / drone.max_processes
439        except ZeroDivisionError:
440            percent = 100
441        metrics.Float('chromeos/autotest/drone/active_process_percentage'
442                      ).set(percent, fields={'drone_hostname': drone.hostname})
443
444    def trigger_refresh(self):
445        """Triggers a drone manager refresh.
446
447        @raises DroneManagerError: If a drone has un-executed calls.
448            Since they will get clobbered when we queue refresh calls.
449        """
450        self._reset()
451        self._drop_old_pidfiles()
452        pidfile_paths = [pidfile_id.path
453                         for pidfile_id in self._registered_pidfile_info]
454        drones = list(self.get_drones())
455        for drone in drones:
456            calls = drone.get_calls()
457            if calls:
458                raise DroneManagerError('Drone %s has un-executed calls: %s '
459                                        'which might get corrupted through '
460                                        'this invocation' %
461                                        (drone, [str(call) for call in calls]))
462            drone.queue_call('refresh', pidfile_paths)
463        logging.info("Invoking drone refresh.")
464        with metrics.SecondsTimer(
465                'chromeos/autotest/drone_manager/trigger_refresh_duration'):
466            self._refresh_task_queue.execute(drones, wait=False)
467
468
469    def sync_refresh(self):
470        """Complete the drone refresh started by trigger_refresh.
471
472        Waits for all drone threads then refreshes internal datastructures
473        with drone process information.
474        """
475
476        # This gives us a dictionary like what follows:
477        # {drone: [{'pidfiles': (raw contents of pidfile paths),
478        #           'autoserv_processes': (autoserv process info from ps),
479        #           'all_processes': (all process info from ps),
480        #           'parse_processes': (parse process infor from ps),
481        #           'pidfile_second_read': (pidfile contents, again),}]
482        #   drone2: ...}
483        # The values of each drone are only a list because this adheres to the
484        # drone utility interface (each call is executed and its results are
485        # places in a list, but since we never couple the refresh calls with
486        # any other call, this list will always contain a single dict).
487        with metrics.SecondsTimer(
488                'chromeos/autotest/drone_manager/sync_refresh_duration'):
489            all_results = self._refresh_task_queue.get_results()
490        logging.info("Drones refreshed.")
491
492        # The loop below goes through and parses pidfile contents. Pidfiles
493        # are used to track autoserv execution, and will always contain < 3
494        # lines of the following: pid, exit code, number of tests. Each pidfile
495        # is identified by a PidfileId object, which contains a unique pidfile
496        # path (unique because it contains the job id) making it hashable.
497        # All pidfiles are stored in the drone managers _pidfiles dict as:
498        #   {pidfile_id: pidfile_contents(Process(drone, pid),
499        #                                 exit_code, num_tests_failed)}
500        # In handle agents, each agent knows its pidfile_id, and uses this
501        # to retrieve the refreshed contents of its pidfile via the
502        # PidfileRunMonitor (through its tick) before making decisions. If
503        # the agent notices that its process has exited, it unregisters the
504        # pidfile from the drone_managers._registered_pidfile_info dict
505        # through its epilog.
506        for drone, results_list in all_results.iteritems():
507            results = results_list[0]
508            drone_hostname = drone.hostname.replace('.', '_')
509
510            for process_info in results['all_processes']:
511                if process_info['comm'] == 'autoserv':
512                    self._add_autoserv_process(drone, process_info)
513                drone_pid = drone.hostname, int(process_info['pid'])
514                self._all_processes[drone_pid] = process_info
515
516            for process_info in results['parse_processes']:
517                self._add_process(drone, process_info)
518
519            self._process_pidfiles(drone, results['pidfiles'], self._pidfiles)
520            self._process_pidfiles(drone, results['pidfiles_second_read'],
521                                   self._pidfiles_second_read)
522
523            self._compute_active_processes(drone)
524            if drone.enabled:
525                self._enqueue_drone(drone)
526                self._check_drone_process_limit(drone)
527
528
529    def refresh(self):
530        """Refresh all drones."""
531        with metrics.SecondsTimer(
532                'chromeos/autotest/drone_manager/refresh_duration'):
533            self.trigger_refresh()
534            self.sync_refresh()
535
536
537    @metrics.SecondsTimerDecorator(
538        'chromeos/autotest/drone_manager/execute_actions_duration')
539    def execute_actions(self):
540        """
541        Called at the end of a scheduler cycle to execute all queued actions
542        on drones.
543        """
544        # Invoke calls queued on all drones since the last call to execute
545        # and wait for them to return.
546        if _THREADED_DRONE_MANAGER:
547            thread_lib.ThreadedTaskQueue(
548                    name='%s.execute_queue' % self._STATS_KEY).execute(
549                            self._drones.values())
550        else:
551            drone_task_queue.DroneTaskQueue().execute(self._drones.values())
552
553        try:
554            self._results_drone.execute_queued_calls()
555        except error.AutoservError:
556            m = 'chromeos/autotest/errors/results_repository_failed'
557            metrics.Counter(m).increment(
558                fields={'drone_hostname': self._results_drone.hostname})
559            self._results_drone.clear_call_queue()
560
561
562    def get_orphaned_autoserv_processes(self):
563        """
564        Returns a set of Process objects for orphaned processes only.
565        """
566        return set(process for process in self._process_set
567                   if process.ppid == 1)
568
569
570    def kill_process(self, process):
571        """
572        Kill the given process.
573        """
574        logging.info('killing %s', process)
575        drone = self._get_drone_for_process(process)
576        drone.queue_call('kill_process', process)
577
578
579    def _ensure_directory_exists(self, path):
580        if not os.path.exists(path):
581            os.makedirs(path)
582
583
584    def total_running_processes(self):
585        return sum(drone.active_processes for drone in self.get_drones())
586
587
588    def max_runnable_processes(self, username, drone_hostnames_allowed):
589        """
590        Return the maximum number of processes that can be run (in a single
591        execution) given the current load on drones.
592        @param username: login of user to run a process.  may be None.
593        @param drone_hostnames_allowed: list of drones that can be used. May be
594                                        None
595        """
596        usable_drone_wrappers = [wrapper for wrapper in self._drone_queue
597                                 if wrapper.drone.usable_by(username) and
598                                 (drone_hostnames_allowed is None or
599                                          wrapper.drone.hostname in
600                                                  drone_hostnames_allowed)]
601        if not usable_drone_wrappers:
602            # all drones disabled or inaccessible
603            return 0
604        runnable_processes = [
605                wrapper.drone.max_processes - wrapper.drone.active_processes
606                for wrapper in usable_drone_wrappers]
607        return max([0] + runnable_processes)
608
609
610    def _least_loaded_drone(self, drones):
611        drone_to_use = drones[0]
612        for drone in drones[1:]:
613            if drone.used_capacity() < drone_to_use.used_capacity():
614                drone_to_use = drone
615        return drone_to_use
616
617
618    def _choose_drone_for_execution(self, num_processes, username,
619                                    drone_hostnames_allowed,
620                                    require_ssp=False):
621        """Choose a drone to execute command.
622
623        @param num_processes: Number of processes needed for execution.
624        @param username: Name of the user to execute the command.
625        @param drone_hostnames_allowed: A list of names of drone allowed.
626        @param require_ssp: Require server-side packaging to execute the,
627                            command, default to False.
628
629        @return: A drone object to be used for execution.
630        """
631        # cycle through drones is order of increasing used capacity until
632        # we find one that can handle these processes
633        checked_drones = []
634        usable_drones = []
635        # Drones do not support server-side packaging, used as backup if no
636        # drone is found to run command requires server-side packaging.
637        no_ssp_drones = []
638        drone_to_use = None
639        while self._drone_queue:
640            drone = heapq.heappop(self._drone_queue).drone
641            checked_drones.append(drone)
642            logging.info('Checking drone %s', drone.hostname)
643            if not drone.usable_by(username):
644                continue
645
646            drone_allowed = (drone_hostnames_allowed is None
647                             or drone.hostname in drone_hostnames_allowed)
648            if not drone_allowed:
649                logging.debug('Drone %s not allowed: ', drone.hostname)
650                continue
651            if require_ssp and not drone.support_ssp:
652                logging.debug('Drone %s does not support server-side '
653                              'packaging.', drone.hostname)
654                no_ssp_drones.append(drone)
655                continue
656
657            usable_drones.append(drone)
658
659            if drone.active_processes + num_processes <= drone.max_processes:
660                drone_to_use = drone
661                break
662            logging.info('Drone %s has %d active + %s requested > %s max',
663                         drone.hostname, drone.active_processes, num_processes,
664                         drone.max_processes)
665
666        if not drone_to_use and usable_drones:
667            # Drones are all over loaded, pick the one with least load.
668            drone_summary = ','.join('%s %s/%s' % (drone.hostname,
669                                                   drone.active_processes,
670                                                   drone.max_processes)
671                                     for drone in usable_drones)
672            logging.error('No drone has capacity to handle %d processes (%s) '
673                          'for user %s', num_processes, drone_summary, username)
674            drone_to_use = self._least_loaded_drone(usable_drones)
675        elif not drone_to_use and require_ssp and no_ssp_drones:
676            # No drone supports server-side packaging, choose the least loaded.
677            drone_to_use = self._least_loaded_drone(no_ssp_drones)
678
679        # refill _drone_queue
680        for drone in checked_drones:
681            self._enqueue_drone(drone)
682
683        return drone_to_use
684
685
686    def _substitute_working_directory_into_command(self, command,
687                                                   working_directory):
688        for i, item in enumerate(command):
689            if item is WORKING_DIRECTORY:
690                command[i] = working_directory
691
692
693    def execute_command(self, command, working_directory, pidfile_name,
694                        num_processes, log_file=None, paired_with_pidfile=None,
695                        username=None, drone_hostnames_allowed=None):
696        """
697        Execute the given command, taken as an argv list.
698
699        @param command: command to execute as a list.  if any item is
700                WORKING_DIRECTORY, the absolute path to the working directory
701                will be substituted for it.
702        @param working_directory: directory in which the pidfile will be written
703        @param pidfile_name: name of the pidfile this process will write
704        @param num_processes: number of processes to account for from this
705                execution
706        @param log_file (optional): path (in the results repository) to hold
707                command output.
708        @param paired_with_pidfile (optional): a PidfileId for an
709                already-executed process; the new process will execute on the
710                same drone as the previous process.
711        @param username (optional): login of the user responsible for this
712                process.
713        @param drone_hostnames_allowed (optional): hostnames of the drones that
714                                                   this command is allowed to
715                                                   execute on
716        """
717        abs_working_directory = self.absolute_path(working_directory)
718        if not log_file:
719            log_file = self.get_temporary_path('execute')
720        log_file = self.absolute_path(log_file)
721
722        self._substitute_working_directory_into_command(command,
723                                                        abs_working_directory)
724
725        if paired_with_pidfile:
726            drone = self._get_drone_for_pidfile_id(paired_with_pidfile)
727        else:
728            require_ssp = '--require-ssp' in command
729            drone = self._choose_drone_for_execution(
730                    num_processes, username, drone_hostnames_allowed,
731                    require_ssp=require_ssp)
732            # Enable --warn-no-ssp option for autoserv to log a warning and run
733            # the command without using server-side packaging.
734            if require_ssp and not drone.support_ssp:
735                command.append('--warn-no-ssp')
736
737        if not drone:
738            raise DroneManagerError('command failed; no drones available: %s'
739                                    % command)
740
741        logging.info("command = %s", command)
742        logging.info('log file = %s:%s', drone.hostname, log_file)
743        self._write_attached_files(working_directory, drone)
744        drone.queue_call('execute_command', command, abs_working_directory,
745                         log_file, pidfile_name)
746        drone.active_processes += num_processes
747        self._reorder_drone_queue()
748
749        pidfile_path = os.path.join(abs_working_directory, pidfile_name)
750        pidfile_id = PidfileId(pidfile_path)
751        self.register_pidfile(pidfile_id)
752        self._registered_pidfile_info[pidfile_id].num_processes = num_processes
753        return pidfile_id
754
755
756    def get_pidfile_id_from(self, execution_tag, pidfile_name):
757        path = os.path.join(self.absolute_path(execution_tag), pidfile_name)
758        return PidfileId(path)
759
760
761    def register_pidfile(self, pidfile_id):
762        """
763        Indicate that the DroneManager should look for the given pidfile when
764        refreshing.
765        """
766        if pidfile_id not in self._registered_pidfile_info:
767            logging.info('monitoring pidfile %s', pidfile_id)
768            self._registered_pidfile_info[pidfile_id] = _PidfileInfo()
769        self._reset_pidfile_age(pidfile_id)
770
771
772    def _reset_pidfile_age(self, pidfile_id):
773        if pidfile_id in self._registered_pidfile_info:
774            self._registered_pidfile_info[pidfile_id].age = 0
775
776
777    def unregister_pidfile(self, pidfile_id):
778        if pidfile_id in self._registered_pidfile_info:
779            logging.info('forgetting pidfile %s', pidfile_id)
780            del self._registered_pidfile_info[pidfile_id]
781
782
783    def declare_process_count(self, pidfile_id, num_processes):
784        self._registered_pidfile_info[pidfile_id].num_processes = num_processes
785
786
787    def get_pidfile_contents(self, pidfile_id, use_second_read=False):
788        """
789        Retrieve a PidfileContents object for the given pidfile_id.  If
790        use_second_read is True, use results that were read after the processes
791        were checked, instead of before.
792        """
793        self._reset_pidfile_age(pidfile_id)
794        if use_second_read:
795            pidfile_map = self._pidfiles_second_read
796        else:
797            pidfile_map = self._pidfiles
798        return pidfile_map.get(pidfile_id, PidfileContents())
799
800
801    def is_process_running(self, process):
802        """
803        Check if the given process is in the running process list.
804        """
805        if process in self._process_set:
806            return True
807
808        drone_pid = process.hostname, process.pid
809        if drone_pid in self._all_processes:
810            logging.error('Process %s found, but not an autoserv process. '
811                    'Is %s', process, self._all_processes[drone_pid])
812            return True
813
814        return False
815
816
817    def get_temporary_path(self, base_name):
818        """
819        Get a new temporary path guaranteed to be unique across all drones
820        for this scheduler execution.
821        """
822        self._temporary_path_counter += 1
823        return os.path.join(drone_utility._TEMPORARY_DIRECTORY,
824                            '%s.%s' % (base_name, self._temporary_path_counter))
825
826
827    def absolute_path(self, path, on_results_repository=False):
828        if on_results_repository:
829            base_dir = self._results_dir
830        else:
831            base_dir = os.path.join(drones.AUTOTEST_INSTALL_DIR,
832                                    _DRONE_RESULTS_DIR_SUFFIX)
833        return os.path.join(base_dir, path)
834
835
836    def _copy_results_helper(self, process, source_path, destination_path,
837                             to_results_repository=False):
838        logging.debug('_copy_results_helper. process: %s, source_path: %s, '
839                      'destination_path: %s, to_results_repository: %s',
840                      process, source_path, destination_path,
841                      to_results_repository)
842        full_source = self.absolute_path(source_path)
843        full_destination = self.absolute_path(
844                destination_path, on_results_repository=to_results_repository)
845        source_drone = self._get_drone_for_process(process)
846        if to_results_repository:
847            source_drone.send_file_to(self._results_drone, full_source,
848                                      full_destination, can_fail=True)
849        else:
850            source_drone.queue_call('copy_file_or_directory', full_source,
851                                    full_destination)
852
853
854    def copy_to_results_repository(self, process, source_path,
855                                   destination_path=None):
856        """
857        Copy results from the given process at source_path to destination_path
858        in the results repository.
859        """
860        if destination_path is None:
861            destination_path = source_path
862        self._copy_results_helper(process, source_path, destination_path,
863                                  to_results_repository=True)
864
865
866    def copy_results_on_drone(self, process, source_path, destination_path):
867        """
868        Copy a results directory from one place to another on the drone.
869        """
870        self._copy_results_helper(process, source_path, destination_path)
871
872
873    def _write_attached_files(self, results_dir, drone):
874        attached_files = self._attached_files.pop(results_dir, {})
875        for file_path, contents in attached_files.iteritems():
876            drone.queue_call('write_to_file', self.absolute_path(file_path),
877                             contents)
878
879
880    def attach_file_to_execution(self, results_dir, file_contents,
881                                 file_path=None):
882        """
883        When the process for the results directory is executed, the given file
884        contents will be placed in a file on the drone.  Returns the path at
885        which the file will be placed.
886        """
887        if not file_path:
888            file_path = self.get_temporary_path('attach')
889        files_for_execution = self._attached_files.setdefault(results_dir, {})
890        assert file_path not in files_for_execution
891        files_for_execution[file_path] = file_contents
892        return file_path
893
894
895    def write_lines_to_file(self, file_path, lines, paired_with_process=None):
896        """
897        Write the given lines (as a list of strings) to a file.  If
898        paired_with_process is given, the file will be written on the drone
899        running the given Process.  Otherwise, the file will be written to the
900        results repository.
901        """
902        file_contents = '\n'.join(lines) + '\n'
903        if paired_with_process:
904            drone = self._get_drone_for_process(paired_with_process)
905            on_results_repository = False
906        else:
907            drone = self._results_drone
908            on_results_repository = True
909        full_path = self.absolute_path(
910                file_path, on_results_repository=on_results_repository)
911        drone.queue_call('write_to_file', full_path, file_contents)
912
913
914SiteDroneManager = utils.import_site_class(
915   __file__, 'autotest_lib.scheduler.site_drone_manager',
916   'SiteDroneManager', BaseDroneManager)
917
918
919class DroneManager(SiteDroneManager):
920    pass
921
922
923_the_instance = None
924
925def instance():
926    if _the_instance is None:
927        _set_instance(DroneManager())
928    return _the_instance
929
930
931def _set_instance(instance): # usable for testing
932    global _the_instance
933    _the_instance = instance
934