1# -*- coding: utf-8 -*-
2# Copyright 2015 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Module to optimize the scheduling of benchmark_run tasks."""
7
8from __future__ import division
9from __future__ import print_function
10
11import sys
12import traceback
13
14from collections import defaultdict
15from threading import Lock
16from threading import Thread
17
18import test_flag
19
20from machine_image_manager import MachineImageManager
21from cros_utils import command_executer
22from cros_utils import logger
23
24
25class DutWorker(Thread):
26  """Working thread for a dut."""
27
28  def __init__(self, dut, sched):
29    super(DutWorker, self).__init__(name='DutWorker-{}'.format(dut.name))
30    self._dut = dut
31    self._sched = sched
32    self._stat_num_br_run = 0
33    self._stat_num_reimage = 0
34    self._stat_annotation = ''
35    self._logger = logger.GetLogger(self._sched.get_experiment().log_dir)
36    self.daemon = True
37    self._terminated = False
38    self._active_br = None
39    # Race condition accessing _active_br between _execute_benchmark_run and
40    # _terminate, so lock it up.
41    self._active_br_lock = Lock()
42
43  def terminate(self):
44    self._terminated = True
45    with self._active_br_lock:
46      if self._active_br is not None:
47        # BenchmarkRun.Terminate() terminates any running testcase via
48        # suite_runner.Terminate and updates timeline.
49        self._active_br.Terminate()
50
51  def run(self):
52    """Do the "run-test->(optionally reimage)->run-test" chore.
53
54        Note - 'br' below means 'benchmark_run'.
55    """
56
57    # Firstly, handle benchmarkruns that have cache hit.
58    br = self._sched.get_cached_benchmark_run()
59    while br:
60      try:
61        self._stat_annotation = 'finishing cached {}'.format(br)
62        br.run()
63      except RuntimeError:
64        traceback.print_exc(file=sys.stdout)
65      br = self._sched.get_cached_benchmark_run()
66
67    # Secondly, handle benchmarkruns that needs to be run on dut.
68    self._setup_dut_label()
69    try:
70      self._logger.LogOutput('{} started.'.format(self))
71      while not self._terminated:
72        br = self._sched.get_benchmark_run(self._dut)
73        if br is None:
74          # No br left for this label. Considering reimaging.
75          label = self._sched.allocate_label(self._dut)
76          if label is None:
77            # No br even for other labels. We are done.
78            self._logger.LogOutput('ImageManager found no label '
79                                   'for dut, stopping working '
80                                   'thread {}.'.format(self))
81            break
82          if self._reimage(label):
83            # Reimage to run other br fails, dut is doomed, stop
84            # this thread.
85            self._logger.LogWarning('Re-image failed, dut '
86                                    'in an unstable state, stopping '
87                                    'working thread {}.'.format(self))
88            break
89        else:
90          # Execute the br.
91          self._execute_benchmark_run(br)
92    finally:
93      self._stat_annotation = 'finished'
94      # Thread finishes. Notify scheduler that I'm done.
95      self._sched.dut_worker_finished(self)
96
97  def _reimage(self, label):
98    """Reimage image to label.
99
100    Args:
101      label: the label to remimage onto dut.
102
103    Returns:
104      0 if successful, otherwise 1.
105    """
106
107    # Termination could happen anywhere, check it.
108    if self._terminated:
109      return 1
110
111    if self._sched.get_experiment().skylab:
112      self._logger.LogOutput('Skylab mode, do not image before testing.')
113      self._dut.label = label
114      return 0
115
116    self._logger.LogOutput('Reimaging {} using {}'.format(self, label))
117    self._stat_num_reimage += 1
118    self._stat_annotation = 'reimaging using "{}"'.format(label.name)
119    try:
120      # Note, only 1 reimage at any given time, this is guaranteed in
121      # ImageMachine, so no sync needed below.
122      retval = self._sched.get_experiment().machine_manager.ImageMachine(
123          self._dut, label)
124
125      if retval:
126        return 1
127    except RuntimeError:
128      return 1
129
130    self._dut.label = label
131    return 0
132
133  def _execute_benchmark_run(self, br):
134    """Execute a single benchmark_run.
135
136        Note - this function never throws exceptions.
137    """
138
139    # Termination could happen anywhere, check it.
140    if self._terminated:
141      return
142
143    self._logger.LogOutput('{} started working on {}'.format(self, br))
144    self._stat_num_br_run += 1
145    self._stat_annotation = 'executing {}'.format(br)
146    # benchmark_run.run does not throws, but just play it safe here.
147    try:
148      assert br.owner_thread is None
149      br.owner_thread = self
150      with self._active_br_lock:
151        self._active_br = br
152      br.run()
153    finally:
154      self._sched.get_experiment().BenchmarkRunFinished(br)
155      with self._active_br_lock:
156        self._active_br = None
157
158  def _setup_dut_label(self):
159    """Try to match dut image with a certain experiment label.
160
161        If such match is found, we just skip doing reimage and jump to execute
162        some benchmark_runs.
163    """
164
165    checksum_file = '/usr/local/osimage_checksum_file'
166    try:
167      rv, checksum, _ = command_executer.GetCommandExecuter().\
168          CrosRunCommandWOutput(
169              'cat ' + checksum_file,
170              chromeos_root=self._sched.get_labels(0).chromeos_root,
171              machine=self._dut.name,
172              print_to_console=False)
173      if rv == 0:
174        checksum = checksum.strip()
175        for l in self._sched.get_labels():
176          if l.checksum == checksum:
177            self._logger.LogOutput("Dut '{}' is pre-installed with '{}'".format(
178                self._dut.name, l))
179            self._dut.label = l
180            return
181    except RuntimeError:
182      traceback.print_exc(file=sys.stdout)
183      self._dut.label = None
184
185  def __str__(self):
186    return 'DutWorker[dut="{}", label="{}"]'.format(
187        self._dut.name, self._dut.label.name if self._dut.label else 'None')
188
189  def dut(self):
190    return self._dut
191
192  def status_str(self):
193    """Report thread status."""
194
195    return ('Worker thread "{}", label="{}", benchmark_run={}, '
196            'reimage={}, now {}'.format(
197                self._dut.name,
198                'None' if self._dut.label is None else self._dut.label.name,
199                self._stat_num_br_run, self._stat_num_reimage,
200                self._stat_annotation))
201
202
203class BenchmarkRunCacheReader(Thread):
204  """The thread to read cache for a list of benchmark_runs.
205
206    On creation, each instance of this class is given a br_list, which is a
207    subset of experiment._benchmark_runs.
208  """
209
210  def __init__(self, schedv2, br_list):
211    super(BenchmarkRunCacheReader, self).__init__()
212    self._schedv2 = schedv2
213    self._br_list = br_list
214    self._logger = self._schedv2.get_logger()
215
216  def run(self):
217    for br in self._br_list:
218      try:
219        br.ReadCache()
220        if br.cache_hit:
221          self._logger.LogOutput('Cache hit - {}'.format(br))
222          with self._schedv2.lock_on('_cached_br_list'):
223            self._schedv2.get_cached_run_list().append(br)
224        else:
225          self._logger.LogOutput('Cache not hit - {}'.format(br))
226      except RuntimeError:
227        traceback.print_exc(file=sys.stderr)
228
229
230class Schedv2(object):
231  """New scheduler for crosperf."""
232
233  def __init__(self, experiment):
234    self._experiment = experiment
235    self._logger = logger.GetLogger(experiment.log_dir)
236
237    # Create shortcuts to nested data structure. "_duts" points to a list of
238    # locked machines. _labels points to a list of all labels.
239    self._duts = self._experiment.machine_manager.GetMachines()
240    self._labels = self._experiment.labels
241
242    # Bookkeeping for synchronization.
243    self._workers_lock = Lock()
244    # pylint: disable=unnecessary-lambda
245    self._lock_map = defaultdict(lambda: Lock())
246
247    # Test mode flag
248    self._in_test_mode = test_flag.GetTestMode()
249
250    # Read benchmarkrun cache.
251    self._read_br_cache()
252
253    # Mapping from label to a list of benchmark_runs.
254    self._label_brl_map = dict((l, []) for l in self._labels)
255    for br in self._experiment.benchmark_runs:
256      assert br.label in self._label_brl_map
257      # Only put no-cache-hit br into the map.
258      if br not in self._cached_br_list:
259        self._label_brl_map[br.label].append(br)
260
261    # Use machine image manager to calculate initial label allocation.
262    self._mim = MachineImageManager(self._labels, self._duts)
263    self._mim.compute_initial_allocation()
264
265    # Create worker thread, 1 per dut.
266    self._active_workers = [DutWorker(dut, self) for dut in self._duts]
267    self._finished_workers = []
268
269    # Termination flag.
270    self._terminated = False
271
272  def run_sched(self):
273    """Start all dut worker threads and return immediately."""
274
275    for w in self._active_workers:
276      w.start()
277
278  def _read_br_cache(self):
279    """Use multi-threading to read cache for all benchmarkruns.
280
281        We do this by firstly creating a few threads, and then assign each
282        thread a segment of all brs. Each thread will check cache status for
283        each br and put those with cache into '_cached_br_list'.
284    """
285
286    self._cached_br_list = []
287    n_benchmarkruns = len(self._experiment.benchmark_runs)
288    if n_benchmarkruns <= 4:
289      # Use single thread to read cache.
290      self._logger.LogOutput(('Starting to read cache status for '
291                              '{} benchmark runs ...').format(n_benchmarkruns))
292      BenchmarkRunCacheReader(self, self._experiment.benchmark_runs).run()
293      return
294
295    # Split benchmarkruns set into segments. Each segment will be handled by
296    # a thread. Note, we use (x+3)/4 to mimic math.ceil(x/4).
297    n_threads = max(2, min(20, (n_benchmarkruns + 3) // 4))
298    self._logger.LogOutput(('Starting {} threads to read cache status for '
299                            '{} benchmark runs ...').format(
300                                n_threads, n_benchmarkruns))
301    benchmarkruns_per_thread = (n_benchmarkruns + n_threads - 1) // n_threads
302    benchmarkrun_segments = []
303    for i in range(n_threads - 1):
304      start = i * benchmarkruns_per_thread
305      end = (i + 1) * benchmarkruns_per_thread
306      benchmarkrun_segments.append(self._experiment.benchmark_runs[start:end])
307    benchmarkrun_segments.append(
308        self._experiment.benchmark_runs[(n_threads - 1) *
309                                        benchmarkruns_per_thread:])
310
311    # Assert: aggregation of benchmarkrun_segments equals to benchmark_runs.
312    assert sum(len(x) for x in benchmarkrun_segments) == n_benchmarkruns
313
314    # Create and start all readers.
315    cache_readers = [
316        BenchmarkRunCacheReader(self, x) for x in benchmarkrun_segments
317    ]
318
319    for x in cache_readers:
320      x.start()
321
322    # Wait till all readers finish.
323    for x in cache_readers:
324      x.join()
325
326    # Summarize.
327    self._logger.LogOutput(
328        'Total {} cache hit out of {} benchmark_runs.'.format(
329            len(self._cached_br_list), n_benchmarkruns))
330
331  def get_cached_run_list(self):
332    return self._cached_br_list
333
334  def get_label_map(self):
335    return self._label_brl_map
336
337  def get_experiment(self):
338    return self._experiment
339
340  def get_labels(self, i=None):
341    if i is None:
342      return self._labels
343    return self._labels[i]
344
345  def get_logger(self):
346    return self._logger
347
348  def get_cached_benchmark_run(self):
349    """Get a benchmark_run with 'cache hit'.
350
351    Returns:
352      The benchmark that has cache hit, if any. Otherwise none.
353    """
354
355    with self.lock_on('_cached_br_list'):
356      if self._cached_br_list:
357        return self._cached_br_list.pop()
358      return None
359
360  def get_benchmark_run(self, dut):
361    """Get a benchmark_run (br) object for a certain dut.
362
363    Args:
364      dut: the dut for which a br is returned.
365
366    Returns:
367      A br with its label matching that of the dut. If no such br could be
368      found, return None (this usually means a reimage is required for the
369      dut).
370    """
371
372    # If terminated, stop providing any br.
373    if self._terminated:
374      return None
375
376    # If dut bears an unrecognized label, return None.
377    if dut.label is None:
378      return None
379
380    # If br list for the dut's label is empty (that means all brs for this
381    # label have been done), return None.
382    with self.lock_on(dut.label):
383      brl = self._label_brl_map[dut.label]
384      if not brl:
385        return None
386      # Return the first br.
387      return brl.pop(0)
388
389  def allocate_label(self, dut):
390    """Allocate a label to a dut.
391
392        The work is delegated to MachineImageManager.
393
394        The dut_worker calling this method is responsible for reimage the dut to
395        this label.
396
397    Args:
398      dut: the new label that is to be reimaged onto the dut.
399
400    Returns:
401      The label or None.
402    """
403
404    if self._terminated:
405      return None
406
407    return self._mim.allocate(dut, self)
408
409  def dut_worker_finished(self, dut_worker):
410    """Notify schedv2 that the dut_worker thread finished.
411
412    Args:
413      dut_worker: the thread that is about to end.
414    """
415
416    self._logger.LogOutput('{} finished.'.format(dut_worker))
417    with self._workers_lock:
418      self._active_workers.remove(dut_worker)
419      self._finished_workers.append(dut_worker)
420
421  def is_complete(self):
422    return len(self._active_workers) == 0
423
424  def lock_on(self, my_object):
425    return self._lock_map[my_object]
426
427  def terminate(self):
428    """Mark flag so we stop providing br/reimages.
429
430        Also terminate each DutWorker, so they refuse to execute br or reimage.
431    """
432
433    self._terminated = True
434    for dut_worker in self._active_workers:
435      dut_worker.terminate()
436
437  def threads_status_as_string(self):
438    """Report the dut worker threads status."""
439
440    status = '{} active threads, {} finished threads.\n'.format(
441        len(self._active_workers), len(self._finished_workers))
442    status += '  Active threads:'
443    for dw in self._active_workers:
444      status += '\n    ' + dw.status_str()
445    if self._finished_workers:
446      status += '\n  Finished threads:'
447      for dw in self._finished_workers:
448        status += '\n    ' + dw.status_str()
449    return status
450