1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Run a group of subprocesses and then finish."""
15
16from __future__ import print_function
17
18import logging
19import multiprocessing
20import os
21import platform
22import re
23import signal
24import subprocess
25import sys
26import tempfile
27import time
28import errno
29
30# cpu cost measurement
31measure_cpu_costs = False
32
33_DEFAULT_MAX_JOBS = 16 * multiprocessing.cpu_count()
34# Maximum number of bytes of job's stdout that will be stored in the result.
35# Only last N bytes of stdout will be kept if the actual output longer.
36_MAX_RESULT_SIZE = 64 * 1024
37
38
39# NOTE: If you change this, please make sure to test reviewing the
40# github PR with http://reviewable.io, which is known to add UTF-8
41# characters to the PR description, which leak into the environment here
42# and cause failures.
43def strip_non_ascii_chars(s):
44    return ''.join(c for c in s if ord(c) < 128)
45
46
47def sanitized_environment(env):
48    sanitized = {}
49    for key, value in env.items():
50        sanitized[strip_non_ascii_chars(key)] = strip_non_ascii_chars(value)
51    return sanitized
52
53
54def platform_string():
55    if platform.system() == 'Windows':
56        return 'windows'
57    elif platform.system()[:7] == 'MSYS_NT':
58        return 'windows'
59    elif platform.system() == 'Darwin':
60        return 'mac'
61    elif platform.system() == 'Linux':
62        return 'linux'
63    else:
64        return 'posix'
65
66
67# setup a signal handler so that signal.pause registers 'something'
68# when a child finishes
69# not using futures and threading to avoid a dependency on subprocess32
70if platform_string() == 'windows':
71    pass
72else:
73
74    def alarm_handler(unused_signum, unused_frame):
75        pass
76
77    signal.signal(signal.SIGCHLD, lambda unused_signum, unused_frame: None)
78    signal.signal(signal.SIGALRM, alarm_handler)
79
80_SUCCESS = object()
81_FAILURE = object()
82_RUNNING = object()
83_KILLED = object()
84
85_COLORS = {
86    'red': [31, 0],
87    'green': [32, 0],
88    'yellow': [33, 0],
89    'lightgray': [37, 0],
90    'gray': [30, 1],
91    'purple': [35, 0],
92    'cyan': [36, 0]
93}
94
95_BEGINNING_OF_LINE = '\x1b[0G'
96_CLEAR_LINE = '\x1b[2K'
97
98_TAG_COLOR = {
99    'FAILED': 'red',
100    'FLAKE': 'purple',
101    'TIMEOUT_FLAKE': 'purple',
102    'WARNING': 'yellow',
103    'TIMEOUT': 'red',
104    'PASSED': 'green',
105    'START': 'gray',
106    'WAITING': 'yellow',
107    'SUCCESS': 'green',
108    'IDLE': 'gray',
109    'SKIPPED': 'cyan'
110}
111
112_FORMAT = '%(asctime)-15s %(message)s'
113logging.basicConfig(level=logging.INFO, format=_FORMAT)
114
115
116def eintr_be_gone(fn):
117    """Run fn until it doesn't stop because of EINTR"""
118    while True:
119        try:
120            return fn()
121        except IOError, e:
122            if e.errno != errno.EINTR:
123                raise
124
125
126def message(tag, msg, explanatory_text=None, do_newline=False):
127    if message.old_tag == tag and message.old_msg == msg and not explanatory_text:
128        return
129    message.old_tag = tag
130    message.old_msg = msg
131    while True:
132        try:
133            if platform_string() == 'windows' or not sys.stdout.isatty():
134                if explanatory_text:
135                    logging.info(explanatory_text)
136                logging.info('%s: %s', tag, msg)
137            else:
138                sys.stdout.write(
139                    '%s%s%s\x1b[%d;%dm%s\x1b[0m: %s%s' %
140                    (_BEGINNING_OF_LINE, _CLEAR_LINE, '\n%s' % explanatory_text
141                     if explanatory_text is not None else '',
142                     _COLORS[_TAG_COLOR[tag]][1], _COLORS[_TAG_COLOR[tag]][0],
143                     tag, msg, '\n'
144                     if do_newline or explanatory_text is not None else ''))
145            sys.stdout.flush()
146            return
147        except IOError, e:
148            if e.errno != errno.EINTR:
149                raise
150
151
152message.old_tag = ''
153message.old_msg = ''
154
155
156def which(filename):
157    if '/' in filename:
158        return filename
159    for path in os.environ['PATH'].split(os.pathsep):
160        if os.path.exists(os.path.join(path, filename)):
161            return os.path.join(path, filename)
162    raise Exception('%s not found' % filename)
163
164
165class JobSpec(object):
166    """Specifies what to run for a job."""
167
168    def __init__(self,
169                 cmdline,
170                 shortname=None,
171                 environ=None,
172                 cwd=None,
173                 shell=False,
174                 timeout_seconds=5 * 60,
175                 flake_retries=0,
176                 timeout_retries=0,
177                 kill_handler=None,
178                 cpu_cost=1.0,
179                 verbose_success=False):
180        """
181    Arguments:
182      cmdline: a list of arguments to pass as the command line
183      environ: a dictionary of environment variables to set in the child process
184      kill_handler: a handler that will be called whenever job.kill() is invoked
185      cpu_cost: number of cores per second this job needs
186    """
187        if environ is None:
188            environ = {}
189        self.cmdline = cmdline
190        self.environ = environ
191        self.shortname = cmdline[0] if shortname is None else shortname
192        self.cwd = cwd
193        self.shell = shell
194        self.timeout_seconds = timeout_seconds
195        self.flake_retries = flake_retries
196        self.timeout_retries = timeout_retries
197        self.kill_handler = kill_handler
198        self.cpu_cost = cpu_cost
199        self.verbose_success = verbose_success
200
201    def identity(self):
202        return '%r %r' % (self.cmdline, self.environ)
203
204    def __hash__(self):
205        return hash(self.identity())
206
207    def __cmp__(self, other):
208        return self.identity() == other.identity()
209
210    def __repr__(self):
211        return 'JobSpec(shortname=%s, cmdline=%s)' % (self.shortname,
212                                                      self.cmdline)
213
214    def __str__(self):
215        return '%s: %s %s' % (self.shortname, ' '.join(
216            '%s=%s' % kv for kv in self.environ.items()),
217                              ' '.join(self.cmdline))
218
219
220class JobResult(object):
221
222    def __init__(self):
223        self.state = 'UNKNOWN'
224        self.returncode = -1
225        self.elapsed_time = 0
226        self.num_failures = 0
227        self.retries = 0
228        self.message = ''
229        self.cpu_estimated = 1
230        self.cpu_measured = 1
231
232
233def read_from_start(f):
234    f.seek(0)
235    return f.read()
236
237
238class Job(object):
239    """Manages one job."""
240
241    def __init__(self,
242                 spec,
243                 newline_on_success,
244                 travis,
245                 add_env,
246                 quiet_success=False):
247        self._spec = spec
248        self._newline_on_success = newline_on_success
249        self._travis = travis
250        self._add_env = add_env.copy()
251        self._retries = 0
252        self._timeout_retries = 0
253        self._suppress_failure_message = False
254        self._quiet_success = quiet_success
255        if not self._quiet_success:
256            message('START', spec.shortname, do_newline=self._travis)
257        self.result = JobResult()
258        self.start()
259
260    def GetSpec(self):
261        return self._spec
262
263    def start(self):
264        self._tempfile = tempfile.TemporaryFile()
265        env = dict(os.environ)
266        env.update(self._spec.environ)
267        env.update(self._add_env)
268        env = sanitized_environment(env)
269        self._start = time.time()
270        cmdline = self._spec.cmdline
271        # The Unix time command is finicky when used with MSBuild, so we don't use it
272        # with jobs that run MSBuild.
273        global measure_cpu_costs
274        if measure_cpu_costs and not 'vsprojects\\build' in cmdline[0]:
275            cmdline = ['time', '-p'] + cmdline
276        else:
277            measure_cpu_costs = False
278        try_start = lambda: subprocess.Popen(args=cmdline,
279                                             stderr=subprocess.STDOUT,
280                                             stdout=self._tempfile,
281                                             cwd=self._spec.cwd,
282                                             shell=self._spec.shell,
283                                             env=env)
284        delay = 0.3
285        for i in range(0, 4):
286            try:
287                self._process = try_start()
288                break
289            except OSError:
290                message('WARNING',
291                        'Failed to start %s, retrying in %f seconds' %
292                        (self._spec.shortname, delay))
293                time.sleep(delay)
294                delay *= 2
295        else:
296            self._process = try_start()
297        self._state = _RUNNING
298
299    def state(self):
300        """Poll current state of the job. Prints messages at completion."""
301
302        def stdout(self=self):
303            stdout = read_from_start(self._tempfile)
304            self.result.message = stdout[-_MAX_RESULT_SIZE:]
305            return stdout
306
307        if self._state == _RUNNING and self._process.poll() is not None:
308            elapsed = time.time() - self._start
309            self.result.elapsed_time = elapsed
310            if self._process.returncode != 0:
311                if self._retries < self._spec.flake_retries:
312                    message(
313                        'FLAKE',
314                        '%s [ret=%d, pid=%d]' %
315                        (self._spec.shortname, self._process.returncode,
316                         self._process.pid),
317                        stdout(),
318                        do_newline=True)
319                    self._retries += 1
320                    self.result.num_failures += 1
321                    self.result.retries = self._timeout_retries + self._retries
322                    # NOTE: job is restarted regardless of jobset's max_time setting
323                    self.start()
324                else:
325                    self._state = _FAILURE
326                    if not self._suppress_failure_message:
327                        message(
328                            'FAILED',
329                            '%s [ret=%d, pid=%d, time=%.1fsec]' %
330                            (self._spec.shortname, self._process.returncode,
331                             self._process.pid, elapsed),
332                            stdout(),
333                            do_newline=True)
334                    self.result.state = 'FAILED'
335                    self.result.num_failures += 1
336                    self.result.returncode = self._process.returncode
337            else:
338                self._state = _SUCCESS
339                measurement = ''
340                if measure_cpu_costs:
341                    m = re.search(
342                        r'real\s+([0-9.]+)\nuser\s+([0-9.]+)\nsys\s+([0-9.]+)',
343                        stdout())
344                    real = float(m.group(1))
345                    user = float(m.group(2))
346                    sys = float(m.group(3))
347                    if real > 0.5:
348                        cores = (user + sys) / real
349                        self.result.cpu_measured = float('%.01f' % cores)
350                        self.result.cpu_estimated = float(
351                            '%.01f' % self._spec.cpu_cost)
352                        measurement = '; cpu_cost=%.01f; estimated=%.01f' % (
353                            self.result.cpu_measured, self.result.cpu_estimated)
354                if not self._quiet_success:
355                    message(
356                        'PASSED',
357                        '%s [time=%.1fsec, retries=%d:%d%s]' %
358                        (self._spec.shortname, elapsed, self._retries,
359                         self._timeout_retries, measurement),
360                        stdout() if self._spec.verbose_success else None,
361                        do_newline=self._newline_on_success or self._travis)
362                self.result.state = 'PASSED'
363        elif (self._state == _RUNNING and
364              self._spec.timeout_seconds is not None and
365              time.time() - self._start > self._spec.timeout_seconds):
366            elapsed = time.time() - self._start
367            self.result.elapsed_time = elapsed
368            if self._timeout_retries < self._spec.timeout_retries:
369                message(
370                    'TIMEOUT_FLAKE',
371                    '%s [pid=%d]' % (self._spec.shortname, self._process.pid),
372                    stdout(),
373                    do_newline=True)
374                self._timeout_retries += 1
375                self.result.num_failures += 1
376                self.result.retries = self._timeout_retries + self._retries
377                if self._spec.kill_handler:
378                    self._spec.kill_handler(self)
379                self._process.terminate()
380                # NOTE: job is restarted regardless of jobset's max_time setting
381                self.start()
382            else:
383                message(
384                    'TIMEOUT',
385                    '%s [pid=%d, time=%.1fsec]' % (self._spec.shortname,
386                                                   self._process.pid, elapsed),
387                    stdout(),
388                    do_newline=True)
389                self.kill()
390                self.result.state = 'TIMEOUT'
391                self.result.num_failures += 1
392        return self._state
393
394    def kill(self):
395        if self._state == _RUNNING:
396            self._state = _KILLED
397            if self._spec.kill_handler:
398                self._spec.kill_handler(self)
399            self._process.terminate()
400
401    def suppress_failure_message(self):
402        self._suppress_failure_message = True
403
404
405class Jobset(object):
406    """Manages one run of jobs."""
407
408    def __init__(self, check_cancelled, maxjobs, maxjobs_cpu_agnostic,
409                 newline_on_success, travis, stop_on_failure, add_env,
410                 quiet_success, max_time):
411        self._running = set()
412        self._check_cancelled = check_cancelled
413        self._cancelled = False
414        self._failures = 0
415        self._completed = 0
416        self._maxjobs = maxjobs
417        self._maxjobs_cpu_agnostic = maxjobs_cpu_agnostic
418        self._newline_on_success = newline_on_success
419        self._travis = travis
420        self._stop_on_failure = stop_on_failure
421        self._add_env = add_env
422        self._quiet_success = quiet_success
423        self._max_time = max_time
424        self.resultset = {}
425        self._remaining = None
426        self._start_time = time.time()
427
428    def set_remaining(self, remaining):
429        self._remaining = remaining
430
431    def get_num_failures(self):
432        return self._failures
433
434    def cpu_cost(self):
435        c = 0
436        for job in self._running:
437            c += job._spec.cpu_cost
438        return c
439
440    def start(self, spec):
441        """Start a job. Return True on success, False on failure."""
442        while True:
443            if self._max_time > 0 and time.time(
444            ) - self._start_time > self._max_time:
445                skipped_job_result = JobResult()
446                skipped_job_result.state = 'SKIPPED'
447                message('SKIPPED', spec.shortname, do_newline=True)
448                self.resultset[spec.shortname] = [skipped_job_result]
449                return True
450            if self.cancelled(): return False
451            current_cpu_cost = self.cpu_cost()
452            if current_cpu_cost == 0: break
453            if current_cpu_cost + spec.cpu_cost <= self._maxjobs:
454                if len(self._running) < self._maxjobs_cpu_agnostic:
455                    break
456            self.reap(spec.shortname, spec.cpu_cost)
457        if self.cancelled(): return False
458        job = Job(spec, self._newline_on_success, self._travis, self._add_env,
459                  self._quiet_success)
460        self._running.add(job)
461        if job.GetSpec().shortname not in self.resultset:
462            self.resultset[job.GetSpec().shortname] = []
463        return True
464
465    def reap(self, waiting_for=None, waiting_for_cost=None):
466        """Collect the dead jobs."""
467        while self._running:
468            dead = set()
469            for job in self._running:
470                st = eintr_be_gone(lambda: job.state())
471                if st == _RUNNING: continue
472                if st == _FAILURE or st == _KILLED:
473                    self._failures += 1
474                    if self._stop_on_failure:
475                        self._cancelled = True
476                        for job in self._running:
477                            job.kill()
478                dead.add(job)
479                break
480            for job in dead:
481                self._completed += 1
482                if not self._quiet_success or job.result.state != 'PASSED':
483                    self.resultset[job.GetSpec().shortname].append(job.result)
484                self._running.remove(job)
485            if dead: return
486            if not self._travis and platform_string() != 'windows':
487                rstr = '' if self._remaining is None else '%d queued, ' % self._remaining
488                if self._remaining is not None and self._completed > 0:
489                    now = time.time()
490                    sofar = now - self._start_time
491                    remaining = sofar / self._completed * (
492                        self._remaining + len(self._running))
493                    rstr = 'ETA %.1f sec; %s' % (remaining, rstr)
494                if waiting_for is not None:
495                    wstr = ' next: %s @ %.2f cpu' % (waiting_for,
496                                                     waiting_for_cost)
497                else:
498                    wstr = ''
499                message(
500                    'WAITING',
501                    '%s%d jobs running, %d complete, %d failed (load %.2f)%s' %
502                    (rstr, len(self._running), self._completed, self._failures,
503                     self.cpu_cost(), wstr))
504            if platform_string() == 'windows':
505                time.sleep(0.1)
506            else:
507                signal.alarm(10)
508                signal.pause()
509
510    def cancelled(self):
511        """Poll for cancellation."""
512        if self._cancelled: return True
513        if not self._check_cancelled(): return False
514        for job in self._running:
515            job.kill()
516        self._cancelled = True
517        return True
518
519    def finish(self):
520        while self._running:
521            if self.cancelled(): pass  # poll cancellation
522            self.reap()
523        if platform_string() != 'windows':
524            signal.alarm(0)
525        return not self.cancelled() and self._failures == 0
526
527
528def _never_cancelled():
529    return False
530
531
532def tag_remaining(xs):
533    staging = []
534    for x in xs:
535        staging.append(x)
536        if len(staging) > 5000:
537            yield (staging.pop(0), None)
538    n = len(staging)
539    for i, x in enumerate(staging):
540        yield (x, n - i - 1)
541
542
543def run(cmdlines,
544        check_cancelled=_never_cancelled,
545        maxjobs=None,
546        maxjobs_cpu_agnostic=None,
547        newline_on_success=False,
548        travis=False,
549        infinite_runs=False,
550        stop_on_failure=False,
551        add_env={},
552        skip_jobs=False,
553        quiet_success=False,
554        max_time=-1):
555    if skip_jobs:
556        resultset = {}
557        skipped_job_result = JobResult()
558        skipped_job_result.state = 'SKIPPED'
559        for job in cmdlines:
560            message('SKIPPED', job.shortname, do_newline=True)
561            resultset[job.shortname] = [skipped_job_result]
562        return 0, resultset
563    js = Jobset(check_cancelled, maxjobs if maxjobs is not None else
564                _DEFAULT_MAX_JOBS, maxjobs_cpu_agnostic
565                if maxjobs_cpu_agnostic is not None else _DEFAULT_MAX_JOBS,
566                newline_on_success, travis, stop_on_failure, add_env,
567                quiet_success, max_time)
568    for cmdline, remaining in tag_remaining(cmdlines):
569        if not js.start(cmdline):
570            break
571        if remaining is not None:
572            js.set_remaining(remaining)
573    js.finish()
574    return js.get_num_failures(), js.resultset
575