1# Copyright 2017 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Library providing an API to lucifer."""
6
7import os
8import logging
9import pipes
10import socket
11import subprocess
12
13import common
14from autotest_lib.client.bin import local_host
15from autotest_lib.client.common_lib import global_config
16from autotest_lib.scheduler.drone_manager import PidfileId
17from autotest_lib.server.hosts import ssh_host
18from autotest_lib.frontend.afe import models
19
20_config = global_config.global_config
21_SECTION = 'LUCIFER'
22
23# TODO(crbug.com/748234): Move these to shadow_config.ini
24# See also drones.AUTOTEST_INSTALL_DIR
25_ENV = '/usr/bin/env'
26_AUTOTEST_DIR = '/usr/local/autotest'
27_JOB_REPORTER_PATH = os.path.join(_AUTOTEST_DIR, 'bin', 'job_reporter')
28
29logger = logging.getLogger(__name__)
30
31
32def is_lucifer_enabled():
33    """Return True if lucifer is enabled in the config."""
34    return True
35
36
37def is_enabled_for(level):
38    """Return True if lucifer is enabled for the given level.
39
40    @param level: string, e.g. 'PARSING', 'GATHERING'
41    """
42    if not is_lucifer_enabled():
43        return False
44    config_level = (_config.get_config_value(_SECTION, 'lucifer_level')
45                    .upper())
46    return level.upper() == config_level
47
48
49def is_lucifer_owned(job):
50    """Return True if job is already sent to lucifer.
51
52    @param job: frontend.afe.models.Job instance
53    """
54    assert isinstance(job, models.Job)
55    return hasattr(job, 'jobhandoff')
56
57
58def is_lucifer_owned_by_id(job_id):
59    """Return True if job is already sent to lucifer."""
60    return models.JobHandoff.objects.filter(job_id=job_id).exists()
61
62
63def is_split_job(hqe_id):
64    """Return True if HQE is part of a job with HQEs in a different group.
65
66    For examples if the given HQE have execution_subdir=foo and the job
67    has an HQE with execution_subdir=bar, then return True.  The only
68    situation where this happens is if provisioning in a multi-DUT job
69    fails, the HQEs will each be in their own group.
70
71    See https://bugs.chromium.org/p/chromium/issues/detail?id=811877
72
73    @param hqe_id: HQE id
74    """
75    hqe = models.HostQueueEntry.objects.get(id=hqe_id)
76    hqes = hqe.job.hostqueueentry_set.all()
77    try:
78        _get_consistent_execution_path(hqes)
79    except ExecutionPathError:
80        return True
81    return False
82
83
84# TODO(crbug.com/748234): This is temporary to enable toggling
85# lucifer rollouts with an option.
86def spawn_starting_job_handler(manager, job):
87    """Spawn job_reporter to handle a job.
88
89    Pass all arguments by keyword.
90
91    @param manager: scheduler.drone_manager.DroneManager instance
92    @param job: Job instance
93    @returns: Drone instance
94    """
95    manager = _DroneManager(manager)
96    drone = manager.pick_drone_to_use()
97    results_dir = _results_dir(manager, job)
98    args = [
99            _JOB_REPORTER_PATH,
100
101            # General configuration
102            '--jobdir', _get_jobdir(),
103            '--lucifer-path', _get_lucifer_path(),
104
105            # Job specific
106            '--lucifer-level', 'STARTING',
107            '--job-id', str(job.id),
108            '--results-dir', results_dir,
109
110            # STARTING specific
111            '--execution-tag', _working_directory(job),
112    ]
113    if _get_gcp_creds():
114        args = [
115                'GOOGLE_APPLICATION_CREDENTIALS=%s'
116                % pipes.quote(_get_gcp_creds()),
117        ] + args
118    drone.spawn(_ENV, args,
119                output_file=_prepare_output_file(drone, results_dir))
120    drone.add_active_processes(1)
121    manager.reorder_drone_queue()
122    manager.register_pidfile_processes(
123            os.path.join(results_dir, '.autoserv_execute'), 1)
124    return drone
125
126
127# TODO(crbug.com/748234): This is temporary to enable toggling
128# lucifer rollouts with an option.
129def spawn_parsing_job_handler(manager, job, autoserv_exit, pidfile_id=None):
130    """Spawn job_reporter to handle a job.
131
132    Pass all arguments by keyword.
133
134    @param manager: scheduler.drone_manager.DroneManager instance
135    @param job: Job instance
136    @param autoserv_exit: autoserv exit status
137    @param pidfile_id: PidfileId instance
138    @returns: Drone instance
139    """
140    manager = _DroneManager(manager)
141    if pidfile_id is None:
142        drone = manager.pick_drone_to_use()
143    else:
144        drone = manager.get_drone_for_pidfile(pidfile_id)
145    results_dir = _results_dir(manager, job)
146    args = [
147            _JOB_REPORTER_PATH,
148
149            # General configuration
150            '--jobdir', _get_jobdir(),
151            '--lucifer-path', _get_lucifer_path(),
152
153            # Job specific
154            '--job-id', str(job.id),
155            '--lucifer-level', 'STARTING',
156            '--parsing-only',
157            '--results-dir', results_dir,
158    ]
159    if _get_gcp_creds():
160        args = [
161                'GOOGLE_APPLICATION_CREDENTIALS=%s'
162                % pipes.quote(_get_gcp_creds()),
163        ] + args
164    drone.spawn(_ENV, args,
165                output_file=_prepare_output_file(drone, results_dir))
166    drone.add_active_processes(1)
167    manager.reorder_drone_queue()
168    manager.register_pidfile_processes(
169            os.path.join(results_dir, '.autoserv_execute'), 1)
170    return drone
171
172
173_LUCIFER_DIR = 'lucifer'
174
175
176def _prepare_output_file(drone, results_dir):
177    logdir = os.path.join(results_dir, _LUCIFER_DIR)
178    drone.run('mkdir', ['-p', logdir])
179    return os.path.join(logdir, 'job_reporter_output.log')
180
181
182def _get_jobdir():
183    return _config.get_config_value(_SECTION, 'jobdir')
184
185
186def _get_lucifer_path():
187    return os.path.join(_get_binaries_path(), 'lucifer')
188
189
190def _get_binaries_path():
191    """Get binaries dir path from config.."""
192    return _config.get_config_value(_SECTION, 'binaries_path')
193
194
195def _get_gcp_creds():
196    """Return path to GCP service account credentials.
197
198    This is the empty string by default, if no credentials will be used.
199    """
200    return _config.get_config_value(_SECTION, 'gcp_creds', default='')
201
202
203class _DroneManager(object):
204    """Simplified drone API."""
205
206    def __init__(self, old_manager):
207        """Initialize instance.
208
209        @param old_manager: old style DroneManager
210        """
211        self._manager = old_manager
212
213    def get_num_tests_failed(self, pidfile_id):
214        """Return the number of tests failed for autoserv by pidfile.
215
216        @param pidfile_id: PidfileId instance.
217        @returns: int (-1 if missing)
218        """
219        state = self._manager.get_pidfile_contents(pidfile_id)
220        if state.num_tests_failed is None:
221            return -1
222        return state.num_tests_failed
223
224    def get_drone_for_pidfile(self, pidfile_id):
225        """Return a drone to use from a pidfile.
226
227        @param pidfile_id: PidfileId instance.
228        """
229        return _wrap_drone(self._manager.get_drone_for_pidfile_id(pidfile_id))
230
231    def pick_drone_to_use(self, num_processes=1):
232        """Return a drone to use.
233
234        Various options can be passed to optimize drone selection.
235
236        @param num_processes: number of processes the drone is intended
237            to run
238        """
239        old_drone = self._manager.pick_drone_to_use(
240                num_processes=num_processes,
241        )
242        return _wrap_drone(old_drone)
243
244    def absolute_path(self, path):
245        """Return absolute path for drone results.
246
247        The returned path might be remote.
248        """
249        return self._manager.absolute_path(path)
250
251    def register_pidfile_processes(self, path, count):
252        """Register a pidfile with the given number of processes.
253
254        This should be done to allow the drone manager to check the
255        number of processes still alive.  This may be used to select
256        drones based on the number of active processes as a proxy for
257        load.
258
259        The exact semantics depends on the drone manager implementation;
260        implementation specific comments follow:
261
262        Pidfiles are kept in memory to track process count.  Pidfiles
263        are rediscovered when the scheduler restarts.  Thus, errors in
264        pidfile tracking can be fixed by restarting the scheduler.xo
265        """
266        pidfile_id = PidfileId(path)
267        self._manager.register_pidfile(pidfile_id)
268        self._manager._registered_pidfile_info[pidfile_id].num_processes = count
269
270    def reorder_drone_queue(self):
271        """Reorder drone queue according to modified process counts.
272
273        Call this after Drone.add_active_processes().
274        """
275        self._manager.reorder_drone_queue()
276
277
278def _wrap_drone(old_drone):
279    """Wrap an old style drone."""
280    host = old_drone._host
281    if isinstance(host, local_host.LocalHost):
282        return LocalDrone()
283    elif isinstance(host, ssh_host.SSHHost):
284        return RemoteDrone(old_drone)
285    else:
286        raise TypeError('Drone has an unknown host type')
287
288
289def _results_dir(manager, job):
290    """Return results dir for a job.
291
292    Path may be on a remote host.
293    """
294    return manager.absolute_path(_working_directory(job))
295
296
297def _working_directory(job):
298    return _get_consistent_execution_path(job.hostqueueentry_set.all())
299
300
301def _get_consistent_execution_path(execution_entries):
302    first_execution_path = execution_entries[0].execution_path()
303    for execution_entry in execution_entries[1:]:
304        if execution_entry.execution_path() != first_execution_path:
305            raise ExecutionPathError(
306                    '%s (%s) != %s (%s)'
307                    % (execution_entry.execution_path(),
308                       execution_entry,
309                       first_execution_path,
310                       execution_entries[0]))
311    return first_execution_path
312
313
314class ExecutionPathError(Exception):
315    """Raised by _get_consistent_execution_path()."""
316
317
318class Drone(object):
319    """Simplified drone API."""
320
321    def hostname(self):
322        """Return the hostname of the drone."""
323
324    def run(self, path, args):
325        """Run a command synchronously.
326
327        path must be an absolute path.  path may be on a remote machine.
328        args is a list of arguments.
329
330        The process may or may not have its own session.  The process
331        should be short-lived.  It should not try to obtain a
332        controlling terminal.
333
334        The new process will have stdin, stdout, and stderr opened to
335        /dev/null.
336
337        This method intentionally has a very restrictive API.  It should
338        be used to perform setup local to the drone, when the drone may
339        be a remote machine.
340        """
341
342    def spawn(self, path, args, output_file):
343        """Spawn an independent process.
344
345        path must be an absolute path.  path may be on a remote machine.
346        args is a list of arguments.
347
348        The process is spawned in its own session.  It should not try to
349        obtain a controlling terminal.
350
351        The new process will have stdin opened to /dev/null and stdout,
352        stderr opened to output_file.
353
354        output_file is a pathname, but how it is interpreted is
355        implementation defined, e.g., it may be a remote file.
356        """
357
358    def add_active_processes(self, count):
359        """Track additional number of active processes.
360
361        This may be used to select drones based on the number of active
362        processes as a proxy for load.
363
364        _DroneManager.register_pidfile_processes() and
365        _DroneManager.reorder_drone_queue() should also be called.
366
367        The exact semantics depends on the drone manager implementation;
368        implementation specific comments follow:
369
370        Process count is used as a proxy for workload, and one process
371        equals the workload of one autoserv or one job.  This count is
372        recalculated during each scheduler tick, using pidfiles tracked
373        by the drone manager (so the count added by this function only
374        applies for one tick).
375        """
376
377
378class LocalDrone(Drone):
379    """Local implementation of Drone."""
380
381    def hostname(self):
382        return socket.gethostname()
383
384    def run(self, path, args):
385        with open(os.devnull, 'r+b') as null:
386            subprocess.call([path] + args, stdin=null,
387                            stdout=null, stderr=null)
388
389    def spawn(self, path, args, output_file):
390        _spawn(path, [path] + args, output_file)
391
392
393class RemoteDrone(Drone):
394    """Remote implementation of Drone through SSH."""
395
396    def __init__(self, drone):
397        host = drone._host
398        if not isinstance(host, ssh_host.SSHHost):
399            raise TypeError('RemoteDrone must be passed a drone with SSHHost')
400        self._drone = drone
401        self._host = drone._host
402
403    def hostname(self):
404        return self._host.hostname
405
406    def run(self, path, args):
407        cmd_parts = [path] + args
408        safe_cmd = ' '.join(pipes.quote(part) for part in cmd_parts)
409        self._host.run('%(cmd)s <%(null)s >%(null)s 2>&1'
410                       % {'cmd': safe_cmd, 'null': os.devnull})
411
412    def spawn(self, path, args, output_file):
413        cmd_parts = [path] + args
414        safe_cmd = ' '.join(pipes.quote(part) for part in cmd_parts)
415        safe_file = pipes.quote(output_file)
416        # SSH creates a session for each command, so we do not have to
417        # do it.
418        self._host.run('%(cmd)s <%(null)s >>%(file)s 2>&1 &'
419                       % {'cmd': safe_cmd,
420                          'file': safe_file,
421                          'null': os.devnull})
422
423    def add_active_processes(self, count):
424        self._drone.active_processes += count
425
426
427def _spawn(path, argv, output_file):
428    """Spawn a new process in its own session.
429
430    path must be an absolute path.  The first item in argv should be
431    path.
432
433    In the calling process, this function returns on success.
434    The forked process puts itself in its own session and execs.
435
436    The new process will have stdin opened to /dev/null and stdout,
437    stderr opened to output_file.
438    """
439    logger.info('Spawning %r, %r, %r', path, argv, output_file)
440    assert all(isinstance(arg, basestring) for arg in argv)
441    pid = os.fork()
442    if pid:
443        os.waitpid(pid, 0)
444        return
445    # Double fork to reparent to init since monitor_db does not reap.
446    if os.fork():
447        os._exit(os.EX_OK)
448    os.setsid()
449    null_fd = os.open(os.devnull, os.O_RDONLY)
450    os.dup2(null_fd, 0)
451    os.close(null_fd)
452    out_fd = os.open(output_file, os.O_WRONLY | os.O_APPEND | os.O_CREAT)
453    os.dup2(out_fd, 1)
454    os.dup2(out_fd, 2)
455    os.close(out_fd)
456    os.execv(path, argv)
457