1import collections
2import heapq
3import os
4import Queue
5import time
6import threading
7import traceback
8import logging
9
10import common
11from autotest_lib.client.common_lib import error, global_config, utils
12from autotest_lib.client.common_lib.cros.graphite import autotest_stats
13from autotest_lib.scheduler import email_manager, drone_utility, drones
14from autotest_lib.scheduler import drone_task_queue
15from autotest_lib.scheduler import scheduler_config
16from autotest_lib.scheduler import thread_lib
17
18
19# results on drones will be placed under the drone_installation_directory in a
20# directory with this name
21_DRONE_RESULTS_DIR_SUFFIX = 'results'
22
23WORKING_DIRECTORY = object() # see execute_command()
24
25
26AUTOSERV_PID_FILE = '.autoserv_execute'
27CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
28PARSER_PID_FILE = '.parser_execute'
29ARCHIVER_PID_FILE = '.archiver_execute'
30
31ALL_PIDFILE_NAMES = (AUTOSERV_PID_FILE, CRASHINFO_PID_FILE, PARSER_PID_FILE,
32                     ARCHIVER_PID_FILE)
33
34_THREADED_DRONE_MANAGER = global_config.global_config.get_config_value(
35        scheduler_config.CONFIG_SECTION, 'threaded_drone_manager',
36        type=bool, default=True)
37
38
39class DroneManagerError(Exception):
40    pass
41
42
43class CustomEquals(object):
44    def _id(self):
45        raise NotImplementedError
46
47
48    def __eq__(self, other):
49        if not isinstance(other, type(self)):
50            return NotImplemented
51        return self._id() == other._id()
52
53
54    def __ne__(self, other):
55        return not self == other
56
57
58    def __hash__(self):
59        return hash(self._id())
60
61
62class Process(CustomEquals):
63    def __init__(self, hostname, pid, ppid=None):
64        self.hostname = hostname
65        self.pid = pid
66        self.ppid = ppid
67
68    def _id(self):
69        return (self.hostname, self.pid)
70
71
72    def __str__(self):
73        return '%s/%s' % (self.hostname, self.pid)
74
75
76    def __repr__(self):
77        return super(Process, self).__repr__() + '<%s>' % self
78
79
80class PidfileId(CustomEquals):
81    def __init__(self, path):
82        self.path = path
83
84
85    def _id(self):
86        return self.path
87
88
89    def __str__(self):
90        return str(self.path)
91
92
93class _PidfileInfo(object):
94    age = 0
95    num_processes = None
96
97
98class PidfileContents(object):
99    process = None
100    exit_status = None
101    num_tests_failed = None
102
103    def is_invalid(self):
104        return False
105
106
107    def is_running(self):
108        return self.process and not self.exit_status
109
110
111class InvalidPidfile(object):
112    process = None
113    exit_status = None
114    num_tests_failed = None
115
116
117    def __init__(self, error):
118        self.error = error
119
120
121    def is_invalid(self):
122        return True
123
124
125    def is_running(self):
126        return False
127
128
129    def __str__(self):
130        return self.error
131
132
133class _DroneHeapWrapper(object):
134    """Wrapper to compare drones based on used_capacity().
135
136    These objects can be used to keep a heap of drones by capacity.
137    """
138    def __init__(self, drone):
139        self.drone = drone
140
141
142    def __cmp__(self, other):
143        assert isinstance(other, _DroneHeapWrapper)
144        return cmp(self.drone.used_capacity(), other.drone.used_capacity())
145
146
147class BaseDroneManager(object):
148    """
149    This class acts as an interface from the scheduler to drones, whether it be
150    only a single "drone" for localhost or multiple remote drones.
151
152    All paths going into and out of this class are relative to the full results
153    directory, except for those returns by absolute_path().
154    """
155
156
157    # Minimum time to wait before next email
158    # about a drone hitting process limit is sent.
159    NOTIFY_INTERVAL = 60 * 60 * 24 # one day
160    _STATS_KEY = 'drone_manager'
161    _timer = autotest_stats.Timer(_STATS_KEY)
162
163
164    def __init__(self):
165        # absolute path of base results dir
166        self._results_dir = None
167        # holds Process objects
168        self._process_set = set()
169        # holds the list of all processes running on all drones
170        self._all_processes = {}
171        # maps PidfileId to PidfileContents
172        self._pidfiles = {}
173        # same as _pidfiles
174        self._pidfiles_second_read = {}
175        # maps PidfileId to _PidfileInfo
176        self._registered_pidfile_info = {}
177        # used to generate unique temporary paths
178        self._temporary_path_counter = 0
179        # maps hostname to Drone object
180        self._drones = {}
181        self._results_drone = None
182        # maps results dir to dict mapping file path to contents
183        self._attached_files = {}
184        # heapq of _DroneHeapWrappers
185        self._drone_queue = []
186        # map drone hostname to time stamp of email that
187        # has been sent about the drone hitting process limit.
188        self._notify_record = {}
189        # A threaded task queue used to refresh drones asynchronously.
190        if _THREADED_DRONE_MANAGER:
191            self._refresh_task_queue = thread_lib.ThreadedTaskQueue(
192                    name='%s.refresh_queue' % self._STATS_KEY)
193        else:
194            self._refresh_task_queue = drone_task_queue.DroneTaskQueue()
195
196
197    def initialize(self, base_results_dir, drone_hostnames,
198                   results_repository_hostname):
199        self._results_dir = base_results_dir
200
201        for hostname in drone_hostnames:
202            self._add_drone(hostname)
203
204        if not self._drones:
205            # all drones failed to initialize
206            raise DroneManagerError('No valid drones found')
207
208        self.refresh_drone_configs()
209
210        logging.info('Using results repository on %s',
211                     results_repository_hostname)
212        self._results_drone = drones.get_drone(results_repository_hostname)
213        results_installation_dir = global_config.global_config.get_config_value(
214                scheduler_config.CONFIG_SECTION,
215                'results_host_installation_directory', default=None)
216        if results_installation_dir:
217            self._results_drone.set_autotest_install_dir(
218                    results_installation_dir)
219        # don't initialize() the results drone - we don't want to clear out any
220        # directories and we don't need to kill any processes
221
222
223    def reinitialize_drones(self):
224        self._call_all_drones('initialize', self._results_dir)
225
226
227    def shutdown(self):
228        for drone in self.get_drones():
229            drone.shutdown()
230
231
232    def _get_max_pidfile_refreshes(self):
233        """
234        Normally refresh() is called on every monitor_db.Dispatcher.tick().
235
236        @returns: The number of refresh() calls before we forget a pidfile.
237        """
238        pidfile_timeout = global_config.global_config.get_config_value(
239                scheduler_config.CONFIG_SECTION, 'max_pidfile_refreshes',
240                type=int, default=2000)
241        return pidfile_timeout
242
243
244    def _add_drone(self, hostname):
245        logging.info('Adding drone %s', hostname)
246        drone = drones.get_drone(hostname)
247        if drone:
248            self._drones[drone.hostname] = drone
249            drone.call('initialize', self.absolute_path(''))
250
251
252    def _remove_drone(self, hostname):
253        self._drones.pop(hostname, None)
254
255
256    def refresh_drone_configs(self):
257        """
258        Reread global config options for all drones.
259        """
260        # Import server_manager_utils is delayed rather than at the beginning of
261        # this module. The reason is that test_that imports drone_manager when
262        # importing autoserv_utils. The import is done before test_that setup
263        # django (test_that only setup django in setup_local_afe, since it's
264        # not needed when test_that runs the test in a lab duts through :lab:
265        # option. Therefore, if server_manager_utils is imported at the
266        # beginning of this module, test_that will fail since django is not
267        # setup yet.
268        from autotest_lib.site_utils import server_manager_utils
269        config = global_config.global_config
270        section = scheduler_config.CONFIG_SECTION
271        config.parse_config_file()
272        for hostname, drone in self._drones.iteritems():
273            if server_manager_utils.use_server_db():
274                server = server_manager_utils.get_servers(hostname=hostname)[0]
275                attributes = dict([(a.attribute, a.value)
276                                   for a in server.attributes.all()])
277                drone.enabled = (
278                        int(attributes.get('disabled', 0)) == 0)
279                drone.max_processes = int(
280                        attributes.get(
281                            'max_processes',
282                            scheduler_config.config.max_processes_per_drone))
283                allowed_users = attributes.get('users', None)
284            else:
285                disabled = config.get_config_value(
286                        section, '%s_disabled' % hostname, default='')
287                drone.enabled = not bool(disabled)
288                drone.max_processes = config.get_config_value(
289                        section, '%s_max_processes' % hostname, type=int,
290                        default=scheduler_config.config.max_processes_per_drone)
291
292                allowed_users = config.get_config_value(
293                        section, '%s_users' % hostname, default=None)
294            if allowed_users:
295                drone.allowed_users = set(allowed_users.split())
296            else:
297                drone.allowed_users = None
298            logging.info('Drone %s.max_processes: %s', hostname,
299                         drone.max_processes)
300            logging.info('Drone %s.enabled: %s', hostname, drone.enabled)
301            logging.info('Drone %s.allowed_users: %s', hostname,
302                         drone.allowed_users)
303            logging.info('Drone %s.support_ssp: %s', hostname,
304                         drone.support_ssp)
305
306        self._reorder_drone_queue() # max_processes may have changed
307        # Clear notification record about reaching max_processes limit.
308        self._notify_record = {}
309
310
311    def get_drones(self):
312        return self._drones.itervalues()
313
314
315    def cleanup_orphaned_containers(self):
316        """Queue cleanup_orphaned_containers call at each drone.
317        """
318        for drone in self._drones.values():
319            logging.info('Queue cleanup_orphaned_containers at %s',
320                         drone.hostname)
321            drone.queue_call('cleanup_orphaned_containers')
322
323
324    def _get_drone_for_process(self, process):
325        return self._drones[process.hostname]
326
327
328    def _get_drone_for_pidfile_id(self, pidfile_id):
329        pidfile_contents = self.get_pidfile_contents(pidfile_id)
330        assert pidfile_contents.process is not None
331        return self._get_drone_for_process(pidfile_contents.process)
332
333
334    def _drop_old_pidfiles(self):
335        # use items() since the dict is modified in unregister_pidfile()
336        for pidfile_id, info in self._registered_pidfile_info.items():
337            if info.age > self._get_max_pidfile_refreshes():
338                logging.warning('dropping leaked pidfile %s', pidfile_id)
339                self.unregister_pidfile(pidfile_id)
340            else:
341                info.age += 1
342
343
344    def _reset(self):
345        self._process_set = set()
346        self._all_processes = {}
347        self._pidfiles = {}
348        self._pidfiles_second_read = {}
349        self._drone_queue = []
350
351
352    def _call_all_drones(self, method, *args, **kwargs):
353        all_results = {}
354        for drone in self.get_drones():
355            with self._timer.get_client(
356                    '%s.%s' % (drone.hostname.replace('.', '_'), method)):
357                all_results[drone] = drone.call(method, *args, **kwargs)
358        return all_results
359
360
361    def _parse_pidfile(self, drone, raw_contents):
362        """Parse raw pidfile contents.
363
364        @param drone: The drone on which this pidfile was found.
365        @param raw_contents: The raw contents of a pidfile, eg:
366            "pid\nexit_staus\nnum_tests_failed\n".
367        """
368        contents = PidfileContents()
369        if not raw_contents:
370            return contents
371        lines = raw_contents.splitlines()
372        if len(lines) > 3:
373            return InvalidPidfile('Corrupt pid file (%d lines):\n%s' %
374                                  (len(lines), lines))
375        try:
376            pid = int(lines[0])
377            contents.process = Process(drone.hostname, pid)
378            # if len(lines) == 2, assume we caught Autoserv between writing
379            # exit_status and num_failed_tests, so just ignore it and wait for
380            # the next cycle
381            if len(lines) == 3:
382                contents.exit_status = int(lines[1])
383                contents.num_tests_failed = int(lines[2])
384        except ValueError, exc:
385            return InvalidPidfile('Corrupt pid file: ' + str(exc.args))
386
387        return contents
388
389
390    def _process_pidfiles(self, drone, pidfiles, store_in_dict):
391        for pidfile_path, contents in pidfiles.iteritems():
392            pidfile_id = PidfileId(pidfile_path)
393            contents = self._parse_pidfile(drone, contents)
394            store_in_dict[pidfile_id] = contents
395
396
397    def _add_process(self, drone, process_info):
398        process = Process(drone.hostname, int(process_info['pid']),
399                          int(process_info['ppid']))
400        self._process_set.add(process)
401
402
403    def _add_autoserv_process(self, drone, process_info):
404        assert process_info['comm'] == 'autoserv'
405        # only root autoserv processes have pgid == pid
406        if process_info['pgid'] != process_info['pid']:
407            return
408        self._add_process(drone, process_info)
409
410
411    def _enqueue_drone(self, drone):
412        heapq.heappush(self._drone_queue, _DroneHeapWrapper(drone))
413
414
415    def _reorder_drone_queue(self):
416        heapq.heapify(self._drone_queue)
417
418
419    def _compute_active_processes(self, drone):
420        drone.active_processes = 0
421        for pidfile_id, contents in self._pidfiles.iteritems():
422            is_running = contents.exit_status is None
423            on_this_drone = (contents.process
424                             and contents.process.hostname == drone.hostname)
425            if is_running and on_this_drone:
426                info = self._registered_pidfile_info[pidfile_id]
427                if info.num_processes is not None:
428                    drone.active_processes += info.num_processes
429        autotest_stats.Gauge(self._STATS_KEY).send(
430                '%s.%s' % (drone.hostname.replace('.', '_'),
431                           'active_processes'), drone.active_processes)
432
433
434    def _check_drone_process_limit(self, drone):
435        """
436        Notify if the number of processes on |drone| is approaching limit.
437
438        @param drone: A Drone object.
439        """
440        try:
441            percent = float(drone.active_processes) / drone.max_processes
442        except ZeroDivisionError:
443            percent = 100
444        max_percent = scheduler_config.config.max_processes_warning_threshold
445        if percent >= max_percent:
446            message = ('Drone %s is hitting %s of process limit.' %
447                       (drone.hostname, format(percent, '.2%')))
448            logging.warning(message)
449            last_notified = self._notify_record.get(drone.hostname, 0)
450            now = time.time()
451            if last_notified + BaseDroneManager.NOTIFY_INTERVAL < now:
452                body = ('Active processes/Process limit: %d/%d (%s)' %
453                        (drone.active_processes, drone.max_processes,
454                         format(percent, '.2%')))
455                email_manager.manager.enqueue_notify_email(message, body)
456                self._notify_record[drone.hostname] = now
457
458
459    def trigger_refresh(self):
460        """Triggers a drone manager refresh.
461
462        @raises DroneManagerError: If a drone has un-executed calls.
463            Since they will get clobbered when we queue refresh calls.
464        """
465        self._reset()
466        self._drop_old_pidfiles()
467        pidfile_paths = [pidfile_id.path
468                         for pidfile_id in self._registered_pidfile_info]
469        drones = list(self.get_drones())
470        for drone in drones:
471            calls = drone.get_calls()
472            if calls:
473                raise DroneManagerError('Drone %s has un-executed calls: %s '
474                                        'which might get corrupted through '
475                                        'this invocation' %
476                                        (drone, [str(call) for call in calls]))
477            drone.queue_call('refresh', pidfile_paths)
478        logging.info("Invoking drone refresh.")
479        with self._timer.get_client('trigger_refresh'):
480            self._refresh_task_queue.execute(drones, wait=False)
481
482
483    def sync_refresh(self):
484        """Complete the drone refresh started by trigger_refresh.
485
486        Waits for all drone threads then refreshes internal datastructures
487        with drone process information.
488        """
489
490        # This gives us a dictionary like what follows:
491        # {drone: [{'pidfiles': (raw contents of pidfile paths),
492        #           'autoserv_processes': (autoserv process info from ps),
493        #           'all_processes': (all process info from ps),
494        #           'parse_processes': (parse process infor from ps),
495        #           'pidfile_second_read': (pidfile contents, again),}]
496        #   drone2: ...}
497        # The values of each drone are only a list because this adheres to the
498        # drone utility interface (each call is executed and its results are
499        # places in a list, but since we never couple the refresh calls with
500        # any other call, this list will always contain a single dict).
501        with self._timer.get_client('sync_refresh'):
502            all_results = self._refresh_task_queue.get_results()
503        logging.info("Drones refreshed.")
504
505        # The loop below goes through and parses pidfile contents. Pidfiles
506        # are used to track autoserv execution, and will always contain < 3
507        # lines of the following: pid, exit code, number of tests. Each pidfile
508        # is identified by a PidfileId object, which contains a unique pidfile
509        # path (unique because it contains the job id) making it hashable.
510        # All pidfiles are stored in the drone managers _pidfiles dict as:
511        #   {pidfile_id: pidfile_contents(Process(drone, pid),
512        #                                 exit_code, num_tests_failed)}
513        # In handle agents, each agent knows its pidfile_id, and uses this
514        # to retrieve the refreshed contents of its pidfile via the
515        # PidfileRunMonitor (through its tick) before making decisions. If
516        # the agent notices that its process has exited, it unregisters the
517        # pidfile from the drone_managers._registered_pidfile_info dict
518        # through its epilog.
519        for drone, results_list in all_results.iteritems():
520            results = results_list[0]
521            drone_hostname = drone.hostname.replace('.', '_')
522
523            with self._timer.get_client('%s.results' % drone_hostname):
524                for process_info in results['all_processes']:
525                    if process_info['comm'] == 'autoserv':
526                        self._add_autoserv_process(drone, process_info)
527                    drone_pid = drone.hostname, int(process_info['pid'])
528                    self._all_processes[drone_pid] = process_info
529
530                for process_info in results['parse_processes']:
531                    self._add_process(drone, process_info)
532
533            with self._timer.get_client('%s.pidfiles' % drone_hostname):
534                self._process_pidfiles(drone, results['pidfiles'],
535                                       self._pidfiles)
536            with self._timer.get_client('%s.pidfiles_second' % drone_hostname):
537                self._process_pidfiles(drone, results['pidfiles_second_read'],
538                                       self._pidfiles_second_read)
539
540            self._compute_active_processes(drone)
541            if drone.enabled:
542                self._enqueue_drone(drone)
543                self._check_drone_process_limit(drone)
544
545
546    def refresh(self):
547        """Refresh all drones."""
548        with self._timer.get_client('refresh'):
549            self.trigger_refresh()
550            self.sync_refresh()
551
552
553    def execute_actions(self):
554        """
555        Called at the end of a scheduler cycle to execute all queued actions
556        on drones.
557        """
558        # Invoke calls queued on all drones since the last call to execute
559        # and wait for them to return.
560        if _THREADED_DRONE_MANAGER:
561            thread_lib.ThreadedTaskQueue(
562                    name='%s.execute_queue' % self._STATS_KEY).execute(
563                            self._drones.values())
564        else:
565            drone_task_queue.DroneTaskQueue().execute(self._drones.values())
566
567        try:
568            self._results_drone.execute_queued_calls()
569        except error.AutoservError:
570            warning = ('Results repository failed to execute calls:\n' +
571                       traceback.format_exc())
572            email_manager.manager.enqueue_notify_email(
573                'Results repository error', warning)
574            self._results_drone.clear_call_queue()
575
576
577    def get_orphaned_autoserv_processes(self):
578        """
579        Returns a set of Process objects for orphaned processes only.
580        """
581        return set(process for process in self._process_set
582                   if process.ppid == 1)
583
584
585    def kill_process(self, process):
586        """
587        Kill the given process.
588        """
589        logging.info('killing %s', process)
590        drone = self._get_drone_for_process(process)
591        drone.queue_call('kill_process', process)
592
593
594    def _ensure_directory_exists(self, path):
595        if not os.path.exists(path):
596            os.makedirs(path)
597
598
599    def total_running_processes(self):
600        return sum(drone.active_processes for drone in self.get_drones())
601
602
603    def max_runnable_processes(self, username, drone_hostnames_allowed):
604        """
605        Return the maximum number of processes that can be run (in a single
606        execution) given the current load on drones.
607        @param username: login of user to run a process.  may be None.
608        @param drone_hostnames_allowed: list of drones that can be used. May be
609                                        None
610        """
611        usable_drone_wrappers = [wrapper for wrapper in self._drone_queue
612                                 if wrapper.drone.usable_by(username) and
613                                 (drone_hostnames_allowed is None or
614                                          wrapper.drone.hostname in
615                                                  drone_hostnames_allowed)]
616        if not usable_drone_wrappers:
617            # all drones disabled or inaccessible
618            return 0
619        runnable_processes = [
620                wrapper.drone.max_processes - wrapper.drone.active_processes
621                for wrapper in usable_drone_wrappers]
622        return max([0] + runnable_processes)
623
624
625    def _least_loaded_drone(self, drones):
626        drone_to_use = drones[0]
627        for drone in drones[1:]:
628            if drone.used_capacity() < drone_to_use.used_capacity():
629                drone_to_use = drone
630        return drone_to_use
631
632
633    def _choose_drone_for_execution(self, num_processes, username,
634                                    drone_hostnames_allowed,
635                                    require_ssp=False):
636        """Choose a drone to execute command.
637
638        @param num_processes: Number of processes needed for execution.
639        @param username: Name of the user to execute the command.
640        @param drone_hostnames_allowed: A list of names of drone allowed.
641        @param require_ssp: Require server-side packaging to execute the,
642                            command, default to False.
643
644        @return: A drone object to be used for execution.
645        """
646        # cycle through drones is order of increasing used capacity until
647        # we find one that can handle these processes
648        checked_drones = []
649        usable_drones = []
650        # Drones do not support server-side packaging, used as backup if no
651        # drone is found to run command requires server-side packaging.
652        no_ssp_drones = []
653        drone_to_use = None
654        while self._drone_queue:
655            drone = heapq.heappop(self._drone_queue).drone
656            checked_drones.append(drone)
657            logging.info('Checking drone %s', drone.hostname)
658            if not drone.usable_by(username):
659                continue
660
661            drone_allowed = (drone_hostnames_allowed is None
662                             or drone.hostname in drone_hostnames_allowed)
663            if not drone_allowed:
664                logging.debug('Drone %s not allowed: ', drone.hostname)
665                continue
666            if require_ssp and not drone.support_ssp:
667                logging.debug('Drone %s does not support server-side '
668                              'packaging.', drone.hostname)
669                no_ssp_drones.append(drone)
670                continue
671
672            usable_drones.append(drone)
673
674            if drone.active_processes + num_processes <= drone.max_processes:
675                drone_to_use = drone
676                break
677            logging.info('Drone %s has %d active + %s requested > %s max',
678                         drone.hostname, drone.active_processes, num_processes,
679                         drone.max_processes)
680
681        if not drone_to_use and usable_drones:
682            # Drones are all over loaded, pick the one with least load.
683            drone_summary = ','.join('%s %s/%s' % (drone.hostname,
684                                                   drone.active_processes,
685                                                   drone.max_processes)
686                                     for drone in usable_drones)
687            logging.error('No drone has capacity to handle %d processes (%s) '
688                          'for user %s', num_processes, drone_summary, username)
689            drone_to_use = self._least_loaded_drone(usable_drones)
690        elif not drone_to_use and require_ssp and no_ssp_drones:
691            # No drone supports server-side packaging, choose the least loaded.
692            drone_to_use = self._least_loaded_drone(no_ssp_drones)
693
694        # refill _drone_queue
695        for drone in checked_drones:
696            self._enqueue_drone(drone)
697
698        return drone_to_use
699
700
701    def _substitute_working_directory_into_command(self, command,
702                                                   working_directory):
703        for i, item in enumerate(command):
704            if item is WORKING_DIRECTORY:
705                command[i] = working_directory
706
707
708    def execute_command(self, command, working_directory, pidfile_name,
709                        num_processes, log_file=None, paired_with_pidfile=None,
710                        username=None, drone_hostnames_allowed=None):
711        """
712        Execute the given command, taken as an argv list.
713
714        @param command: command to execute as a list.  if any item is
715                WORKING_DIRECTORY, the absolute path to the working directory
716                will be substituted for it.
717        @param working_directory: directory in which the pidfile will be written
718        @param pidfile_name: name of the pidfile this process will write
719        @param num_processes: number of processes to account for from this
720                execution
721        @param log_file (optional): path (in the results repository) to hold
722                command output.
723        @param paired_with_pidfile (optional): a PidfileId for an
724                already-executed process; the new process will execute on the
725                same drone as the previous process.
726        @param username (optional): login of the user responsible for this
727                process.
728        @param drone_hostnames_allowed (optional): hostnames of the drones that
729                                                   this command is allowed to
730                                                   execute on
731        """
732        abs_working_directory = self.absolute_path(working_directory)
733        if not log_file:
734            log_file = self.get_temporary_path('execute')
735        log_file = self.absolute_path(log_file)
736
737        self._substitute_working_directory_into_command(command,
738                                                        abs_working_directory)
739
740        if paired_with_pidfile:
741            drone = self._get_drone_for_pidfile_id(paired_with_pidfile)
742        else:
743            require_ssp = '--require-ssp' in command
744            drone = self._choose_drone_for_execution(
745                    num_processes, username, drone_hostnames_allowed,
746                    require_ssp=require_ssp)
747            # Enable --warn-no-ssp option for autoserv to log a warning and run
748            # the command without using server-side packaging.
749            if require_ssp and not drone.support_ssp:
750                command.append('--warn-no-ssp')
751
752        if not drone:
753            raise DroneManagerError('command failed; no drones available: %s'
754                                    % command)
755
756        logging.info("command = %s", command)
757        logging.info('log file = %s:%s', drone.hostname, log_file)
758        self._write_attached_files(working_directory, drone)
759        drone.queue_call('execute_command', command, abs_working_directory,
760                         log_file, pidfile_name)
761        drone.active_processes += num_processes
762        self._reorder_drone_queue()
763
764        pidfile_path = os.path.join(abs_working_directory, pidfile_name)
765        pidfile_id = PidfileId(pidfile_path)
766        self.register_pidfile(pidfile_id)
767        self._registered_pidfile_info[pidfile_id].num_processes = num_processes
768        return pidfile_id
769
770
771    def get_pidfile_id_from(self, execution_tag, pidfile_name):
772        path = os.path.join(self.absolute_path(execution_tag), pidfile_name)
773        return PidfileId(path)
774
775
776    def register_pidfile(self, pidfile_id):
777        """
778        Indicate that the DroneManager should look for the given pidfile when
779        refreshing.
780        """
781        if pidfile_id not in self._registered_pidfile_info:
782            logging.info('monitoring pidfile %s', pidfile_id)
783            self._registered_pidfile_info[pidfile_id] = _PidfileInfo()
784        self._reset_pidfile_age(pidfile_id)
785
786
787    def _reset_pidfile_age(self, pidfile_id):
788        if pidfile_id in self._registered_pidfile_info:
789            self._registered_pidfile_info[pidfile_id].age = 0
790
791
792    def unregister_pidfile(self, pidfile_id):
793        if pidfile_id in self._registered_pidfile_info:
794            logging.info('forgetting pidfile %s', pidfile_id)
795            del self._registered_pidfile_info[pidfile_id]
796
797
798    def declare_process_count(self, pidfile_id, num_processes):
799        self._registered_pidfile_info[pidfile_id].num_processes = num_processes
800
801
802    def get_pidfile_contents(self, pidfile_id, use_second_read=False):
803        """
804        Retrieve a PidfileContents object for the given pidfile_id.  If
805        use_second_read is True, use results that were read after the processes
806        were checked, instead of before.
807        """
808        self._reset_pidfile_age(pidfile_id)
809        if use_second_read:
810            pidfile_map = self._pidfiles_second_read
811        else:
812            pidfile_map = self._pidfiles
813        return pidfile_map.get(pidfile_id, PidfileContents())
814
815
816    def is_process_running(self, process):
817        """
818        Check if the given process is in the running process list.
819        """
820        if process in self._process_set:
821            return True
822
823        drone_pid = process.hostname, process.pid
824        if drone_pid in self._all_processes:
825            logging.error('Process %s found, but not an autoserv process. '
826                    'Is %s', process, self._all_processes[drone_pid])
827            return True
828
829        return False
830
831
832    def get_temporary_path(self, base_name):
833        """
834        Get a new temporary path guaranteed to be unique across all drones
835        for this scheduler execution.
836        """
837        self._temporary_path_counter += 1
838        return os.path.join(drone_utility._TEMPORARY_DIRECTORY,
839                            '%s.%s' % (base_name, self._temporary_path_counter))
840
841
842    def absolute_path(self, path, on_results_repository=False):
843        if on_results_repository:
844            base_dir = self._results_dir
845        else:
846            base_dir = os.path.join(drones.AUTOTEST_INSTALL_DIR,
847                                    _DRONE_RESULTS_DIR_SUFFIX)
848        return os.path.join(base_dir, path)
849
850
851    def _copy_results_helper(self, process, source_path, destination_path,
852                             to_results_repository=False):
853        logging.debug('_copy_results_helper. process: %s, source_path: %s, '
854                      'destination_path: %s, to_results_repository: %s',
855                      process, source_path, destination_path,
856                      to_results_repository)
857        full_source = self.absolute_path(source_path)
858        full_destination = self.absolute_path(
859                destination_path, on_results_repository=to_results_repository)
860        source_drone = self._get_drone_for_process(process)
861        if to_results_repository:
862            source_drone.send_file_to(self._results_drone, full_source,
863                                      full_destination, can_fail=True)
864        else:
865            source_drone.queue_call('copy_file_or_directory', full_source,
866                                    full_destination)
867
868
869    def copy_to_results_repository(self, process, source_path,
870                                   destination_path=None):
871        """
872        Copy results from the given process at source_path to destination_path
873        in the results repository.
874        """
875        if destination_path is None:
876            destination_path = source_path
877        self._copy_results_helper(process, source_path, destination_path,
878                                  to_results_repository=True)
879
880
881    def copy_results_on_drone(self, process, source_path, destination_path):
882        """
883        Copy a results directory from one place to another on the drone.
884        """
885        self._copy_results_helper(process, source_path, destination_path)
886
887
888    def _write_attached_files(self, results_dir, drone):
889        attached_files = self._attached_files.pop(results_dir, {})
890        for file_path, contents in attached_files.iteritems():
891            drone.queue_call('write_to_file', self.absolute_path(file_path),
892                             contents)
893
894
895    def attach_file_to_execution(self, results_dir, file_contents,
896                                 file_path=None):
897        """
898        When the process for the results directory is executed, the given file
899        contents will be placed in a file on the drone.  Returns the path at
900        which the file will be placed.
901        """
902        if not file_path:
903            file_path = self.get_temporary_path('attach')
904        files_for_execution = self._attached_files.setdefault(results_dir, {})
905        assert file_path not in files_for_execution
906        files_for_execution[file_path] = file_contents
907        return file_path
908
909
910    def write_lines_to_file(self, file_path, lines, paired_with_process=None):
911        """
912        Write the given lines (as a list of strings) to a file.  If
913        paired_with_process is given, the file will be written on the drone
914        running the given Process.  Otherwise, the file will be written to the
915        results repository.
916        """
917        file_contents = '\n'.join(lines) + '\n'
918        if paired_with_process:
919            drone = self._get_drone_for_process(paired_with_process)
920            on_results_repository = False
921        else:
922            drone = self._results_drone
923            on_results_repository = True
924        full_path = self.absolute_path(
925                file_path, on_results_repository=on_results_repository)
926        drone.queue_call('write_to_file', full_path, file_contents)
927
928
929SiteDroneManager = utils.import_site_class(
930   __file__, 'autotest_lib.scheduler.site_drone_manager',
931   'SiteDroneManager', BaseDroneManager)
932
933
934class DroneManager(SiteDroneManager):
935    pass
936
937
938_the_instance = None
939
940def instance():
941    if _the_instance is None:
942        _set_instance(DroneManager())
943    return _the_instance
944
945
946def _set_instance(instance): # usable for testing
947    global _the_instance
948    _the_instance = instance
949