1# Copyright 2017 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Library providing an API to lucifer."""
6
7import os
8import logging
9import pipes
10import socket
11
12import common
13from autotest_lib.client.bin import local_host
14from autotest_lib.client.common_lib import global_config
15from autotest_lib.server.hosts import ssh_host
16from autotest_lib.frontend.afe import models
17
18_config = global_config.global_config
19_SECTION = 'LUCIFER'
20
21# TODO(crbug.com/748234): Move these to shadow_config.ini
22# See also drones.AUTOTEST_INSTALL_DIR
23_ENV = '/usr/bin/env'
24_AUTOTEST_DIR = '/usr/local/autotest'
25_JOB_REPORTER_PATH = os.path.join(_AUTOTEST_DIR, 'bin', 'job_reporter')
26
27logger = logging.getLogger(__name__)
28
29
30def is_lucifer_enabled():
31    """Return True if lucifer is enabled in the config."""
32    return True
33
34
35def is_enabled_for(level):
36    """Return True if lucifer is enabled for the given level.
37
38    @param level: string, e.g. 'PARSING', 'GATHERING'
39    """
40    if not is_lucifer_enabled():
41        return False
42    config_level = (_config.get_config_value(_SECTION, 'lucifer_level')
43                    .upper())
44    return level.upper() == config_level
45
46
47def is_lucifer_owned(job):
48    """Return True if job is already sent to lucifer."""
49    return hasattr(job, 'jobhandoff')
50
51
52def is_split_job(hqe_id):
53    """Return True if HQE is part of a job with HQEs in a different group.
54
55    For examples if the given HQE have execution_subdir=foo and the job
56    has an HQE with execution_subdir=bar, then return True.  The only
57    situation where this happens is if provisioning in a multi-DUT job
58    fails, the HQEs will each be in their own group.
59
60    See https://bugs.chromium.org/p/chromium/issues/detail?id=811877
61
62    @param hqe_id: HQE id
63    """
64    hqe = models.HostQueueEntry.objects.get(id=hqe_id)
65    hqes = hqe.job.hostqueueentry_set.all()
66    try:
67        _get_consistent_execution_path(hqes)
68    except _ExecutionPathError:
69        return True
70    return False
71
72
73# TODO(crbug.com/748234): This is temporary to enable toggling
74# lucifer rollouts with an option.
75def spawn_starting_job_handler(manager, job):
76    """Spawn job_reporter to handle a job.
77
78    Pass all arguments by keyword.
79
80    @param manager: scheduler.drone_manager.DroneManager instance
81    @param job: Job instance
82    @returns: Drone instance
83    """
84    raise NotImplementedError
85
86
87# TODO(crbug.com/748234): This is temporary to enable toggling
88# lucifer rollouts with an option.
89def spawn_gathering_job_handler(manager, job, autoserv_exit, pidfile_id=None):
90    """Spawn job_reporter to handle a job.
91
92    Pass all arguments by keyword.
93
94    @param manager: scheduler.drone_manager.DroneManager instance
95    @param job: Job instance
96    @param autoserv_exit: autoserv exit status
97    @param pidfile_id: PidfileId instance
98    @returns: Drone instance
99    """
100    manager = _DroneManager(manager)
101    if pidfile_id is None:
102        drone = manager.pick_drone_to_use()
103    else:
104        drone = manager.get_drone_for_pidfile(pidfile_id)
105    results_dir = _results_dir(manager, job)
106    num_tests_failed = manager.get_num_tests_failed(pidfile_id)
107    args = [
108            _JOB_REPORTER_PATH,
109
110            # General configuration
111            '--jobdir', _get_jobdir(),
112            '--run-job-path', _get_run_job_path(),
113            '--watcher-path', _get_watcher_path(),
114
115            # Job specific
116            '--job-id', str(job.id),
117            '--lucifer-level', 'GATHERING',
118            '--autoserv-exit', str(autoserv_exit),
119            '--need-gather',
120            '--num-tests-failed', str(num_tests_failed),
121            '--results-dir', results_dir,
122    ]
123    if _get_gcp_creds():
124        args = [
125                'GOOGLE_APPLICATION_CREDENTIALS=%s'
126                % pipes.quote(_get_gcp_creds()),
127        ] + args
128    output_file = os.path.join(results_dir, 'job_reporter_output.log')
129    drone.spawn(_ENV, args, output_file=output_file)
130    return drone
131
132
133# TODO(crbug.com/748234): This is temporary to enable toggling
134# lucifer rollouts with an option.
135def spawn_parsing_job_handler(manager, job, autoserv_exit, pidfile_id=None):
136    """Spawn job_reporter to handle a job.
137
138    Pass all arguments by keyword.
139
140    @param manager: scheduler.drone_manager.DroneManager instance
141    @param job: Job instance
142    @param autoserv_exit: autoserv exit status
143    @param pidfile_id: PidfileId instance
144    @returns: Drone instance
145    """
146    manager = _DroneManager(manager)
147    if pidfile_id is None:
148        drone = manager.pick_drone_to_use()
149    else:
150        drone = manager.get_drone_for_pidfile(pidfile_id)
151    results_dir = _results_dir(manager, job)
152    args = [
153            _JOB_REPORTER_PATH,
154
155            # General configuration
156            '--jobdir', _get_jobdir(),
157            '--run-job-path', _get_run_job_path(),
158            '--watcher-path', _get_watcher_path(),
159
160            # Job specific
161            '--job-id', str(job.id),
162            '--lucifer-level', 'GATHERING',
163            '--autoserv-exit', str(autoserv_exit),
164            '--results-dir', results_dir,
165    ]
166    if _get_gcp_creds():
167        args = [
168                'GOOGLE_APPLICATION_CREDENTIALS=%s'
169                % pipes.quote(_get_gcp_creds()),
170        ] + args
171    output_file = os.path.join(results_dir, 'job_reporter_output.log')
172    drone.spawn(_ENV, args, output_file=output_file)
173    return drone
174
175
176def _get_jobdir():
177    return _config.get_config_value(_SECTION, 'jobdir')
178
179
180def _get_run_job_path():
181    return os.path.join(_get_binaries_path(), 'lucifer_run_job')
182
183
184def _get_watcher_path():
185    return os.path.join(_get_binaries_path(), 'lucifer_watcher')
186
187
188def _get_binaries_path():
189    """Get binaries dir path from config.."""
190    return _config.get_config_value(_SECTION, 'binaries_path')
191
192
193def _get_gcp_creds():
194  """Return path to GCP service account credentials.
195
196  This is the empty string by default, if no credentials will be used.
197  """
198  return _config.get_config_value(_SECTION, 'gcp_creds', default='')
199
200
201class _DroneManager(object):
202    """Simplified drone API."""
203
204    def __init__(self, old_manager):
205        """Initialize instance.
206
207        @param old_manager: old style DroneManager
208        """
209        self._manager = old_manager
210
211    def get_num_tests_failed(self, pidfile_id):
212        """Return the number of tests failed for autoserv by pidfile.
213
214        @param pidfile_id: PidfileId instance.
215        @returns: int (-1 if missing)
216        """
217        state = self._manager.get_pidfile_contents(pidfile_id)
218        if state.num_tests_failed is None:
219            return -1
220        return state.num_tests_failed
221
222    def get_drone_for_pidfile(self, pidfile_id):
223        """Return a drone to use from a pidfile.
224
225        @param pidfile_id: PidfileId instance.
226        """
227        return _wrap_drone(self._manager.get_drone_for_pidfile_id(pidfile_id))
228
229    def pick_drone_to_use(self, num_processes=1, prefer_ssp=False):
230        """Return a drone to use.
231
232        Various options can be passed to optimize drone selection.
233
234        @param num_processes: number of processes the drone is intended
235            to run
236        @param prefer_ssp: indicates whether drones supporting
237            server-side packaging should be preferred.  The returned
238            drone is not guaranteed to support it.
239        """
240        old_drone = self._manager.pick_drone_to_use(
241                num_processes=num_processes,
242                prefer_ssp=prefer_ssp,
243        )
244        return _wrap_drone(old_drone)
245
246    def absolute_path(self, path):
247        """Return absolute path for drone results.
248
249        The returned path might be remote.
250        """
251        return self._manager.absolute_path(path)
252
253
254def _wrap_drone(old_drone):
255    """Wrap an old style drone."""
256    host = old_drone._host
257    if isinstance(host, local_host.LocalHost):
258        return LocalDrone()
259    elif isinstance(host, ssh_host.SSHHost):
260        return RemoteDrone(host)
261    else:
262        raise TypeError('Drone has an unknown host type')
263
264
265def _results_dir(manager, job):
266    """Return results dir for a job.
267
268    Path may be on a remote host.
269    """
270    return manager.absolute_path(_working_directory(job))
271
272
273def _working_directory(job):
274    return _get_consistent_execution_path(job.hostqueueentry_set.all())
275
276
277def _get_consistent_execution_path(execution_entries):
278    first_execution_path = execution_entries[0].execution_path()
279    for execution_entry in execution_entries[1:]:
280        if execution_entry.execution_path() != first_execution_path:
281            raise _ExecutionPathError(
282                    '%s (%s) != %s (%s)'
283                    % (execution_entry.execution_path(),
284                       execution_entry,
285                       first_execution_path,
286                       execution_entries[0]))
287    return first_execution_path
288
289
290class _ExecutionPathError(Exception):
291    """Raised by _get_consistent_execution_path()."""
292
293
294class Drone(object):
295    """Simplified drone API."""
296
297    def hostname(self):
298        """Return the hostname of the drone."""
299
300    def spawn(self, path, args, output_file):
301        """Spawn an independent process.
302
303        path must be an absolute path.  path may be on a remote machine.
304        args is a list of arguments.
305
306        The process is spawned in its own session.  It should not try to
307        obtain a controlling terminal.
308
309        The new process will have stdin opened to /dev/null and stdout,
310        stderr opened to output_file.
311
312        output_file is a pathname, but how it is interpreted is
313        implementation defined, e.g., it may be a remote file.
314        """
315
316
317class LocalDrone(Drone):
318    """Local implementation of Drone."""
319
320    def hostname(self):
321        return socket.gethostname()
322
323    def spawn(self, path, args, output_file):
324        _spawn(path, [path] + args, output_file)
325
326
327class RemoteDrone(Drone):
328    """Remote implementation of Drone through SSH."""
329
330    def __init__(self, host):
331        if not isinstance(host, ssh_host.SSHHost):
332            raise TypeError('RemoteDrone must be passed an SSHHost')
333        self._host = host
334
335    def hostname(self):
336        return self._host.hostname
337
338    def spawn(self, path, args, output_file):
339        cmd_parts = [path] + args
340        safe_cmd = ' '.join(pipes.quote(part) for part in cmd_parts)
341        safe_file = pipes.quote(output_file)
342        # SSH creates a session for each command, so we do not have to
343        # do it.
344        self._host.run('%(cmd)s <%(null)s >>%(file)s 2>&1 &'
345                       % {'cmd': safe_cmd,
346                          'file': safe_file,
347                          'null': os.devnull})
348
349
350def _spawn(path, argv, output_file):
351    """Spawn a new process in its own session.
352
353    path must be an absolute path.  The first item in argv should be
354    path.
355
356    In the calling process, this function returns on success.
357    The forked process puts itself in its own session and execs.
358
359    The new process will have stdin opened to /dev/null and stdout,
360    stderr opened to output_file.
361    """
362    logger.info('Spawning %r, %r, %r', path, argv, output_file)
363    assert all(isinstance(arg, basestring) for arg in argv)
364    pid = os.fork()
365    if pid:
366        os.waitpid(pid, 0)
367        return
368    # Double fork to reparent to init since monitor_db does not reap.
369    if os.fork():
370        os._exit(os.EX_OK)
371    os.setsid()
372    null_fd = os.open(os.devnull, os.O_RDONLY)
373    os.dup2(null_fd, 0)
374    os.close(null_fd)
375    out_fd = os.open(output_file, os.O_WRONLY | os.O_APPEND | os.O_CREAT)
376    os.dup2(out_fd, 1)
377    os.dup2(out_fd, 2)
378    os.close(out_fd)
379    os.execv(path, argv)
380