1# Copyright 2015 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4"""Module to optimize the scheduling of benchmark_run tasks."""
5
6
7from __future__ import print_function
8
9import sys
10import test_flag
11import traceback
12
13from collections import defaultdict
14from machine_image_manager import MachineImageManager
15from threading import Lock
16from threading import Thread
17from cros_utils import command_executer
18from cros_utils import logger
19
20
21class DutWorker(Thread):
22  """Working thread for a dut."""
23
24  def __init__(self, dut, sched):
25    super(DutWorker, self).__init__(name='DutWorker-{}'.format(dut.name))
26    self._dut = dut
27    self._sched = sched
28    self._stat_num_br_run = 0
29    self._stat_num_reimage = 0
30    self._stat_annotation = ''
31    self._logger = logger.GetLogger(self._sched.get_experiment().log_dir)
32    self.daemon = True
33    self._terminated = False
34    self._active_br = None
35    # Race condition accessing _active_br between _execute_benchmark_run and
36    # _terminate, so lock it up.
37    self._active_br_lock = Lock()
38
39  def terminate(self):
40    self._terminated = True
41    with self._active_br_lock:
42      if self._active_br is not None:
43        # BenchmarkRun.Terminate() terminates any running testcase via
44        # suite_runner.Terminate and updates timeline.
45        self._active_br.Terminate()
46
47  def run(self):
48    """Do the "run-test->(optionally reimage)->run-test" chore.
49
50        Note - 'br' below means 'benchmark_run'.
51        """
52
53    # Firstly, handle benchmarkruns that have cache hit.
54    br = self._sched.get_cached_benchmark_run()
55    while br:
56      try:
57        self._stat_annotation = 'finishing cached {}'.format(br)
58        br.run()
59      except RuntimeError:
60        traceback.print_exc(file=sys.stdout)
61      br = self._sched.get_cached_benchmark_run()
62
63    # Secondly, handle benchmarkruns that needs to be run on dut.
64    self._setup_dut_label()
65    try:
66      self._logger.LogOutput('{} started.'.format(self))
67      while not self._terminated:
68        br = self._sched.get_benchmark_run(self._dut)
69        if br is None:
70          # No br left for this label. Considering reimaging.
71          label = self._sched.allocate_label(self._dut)
72          if label is None:
73            # No br even for other labels. We are done.
74            self._logger.LogOutput('ImageManager found no label '
75                                   'for dut, stopping working '
76                                   'thread {}.'.format(self))
77            break
78          if self._reimage(label):
79            # Reimage to run other br fails, dut is doomed, stop
80            # this thread.
81            self._logger.LogWarning('Re-image failed, dut '
82                                    'in an unstable state, stopping '
83                                    'working thread {}.'.format(self))
84            break
85        else:
86          # Execute the br.
87          self._execute_benchmark_run(br)
88    finally:
89      self._stat_annotation = 'finished'
90      # Thread finishes. Notify scheduler that I'm done.
91      self._sched.dut_worker_finished(self)
92
93  def _reimage(self, label):
94    """Reimage image to label.
95
96        Args:
97          label: the label to remimage onto dut.
98
99        Returns:
100          0 if successful, otherwise 1.
101        """
102
103    # Termination could happen anywhere, check it.
104    if self._terminated:
105      return 1
106
107    self._logger.LogOutput('Reimaging {} using {}'.format(self, label))
108    self._stat_num_reimage += 1
109    self._stat_annotation = 'reimaging using "{}"'.format(label.name)
110    try:
111      # Note, only 1 reimage at any given time, this is guaranteed in
112      # ImageMachine, so no sync needed below.
113      retval = self._sched.get_experiment().machine_manager.ImageMachine(
114          self._dut,
115          label)
116
117      if retval:
118        return 1
119    except RuntimeError:
120      return 1
121
122    self._dut.label = label
123    return 0
124
125  def _execute_benchmark_run(self, br):
126    """Execute a single benchmark_run.
127
128        Note - this function never throws exceptions.
129        """
130
131    # Termination could happen anywhere, check it.
132    if self._terminated:
133      return
134
135    self._logger.LogOutput('{} started working on {}'.format(self, br))
136    self._stat_num_br_run += 1
137    self._stat_annotation = 'executing {}'.format(br)
138    # benchmark_run.run does not throws, but just play it safe here.
139    try:
140      assert br.owner_thread is None
141      br.owner_thread = self
142      with self._active_br_lock:
143        self._active_br = br
144      br.run()
145    finally:
146      self._sched.get_experiment().BenchmarkRunFinished(br)
147      with self._active_br_lock:
148        self._active_br = None
149
150  def _setup_dut_label(self):
151    """Try to match dut image with a certain experiment label.
152
153        If such match is found, we just skip doing reimage and jump to execute
154        some benchmark_runs.
155        """
156
157    checksum_file = '/usr/local/osimage_checksum_file'
158    try:
159      rv, checksum, _ = command_executer.GetCommandExecuter().\
160          CrosRunCommandWOutput(
161              'cat ' + checksum_file,
162              chromeos_root=self._sched.get_labels(0).chromeos_root,
163              machine=self._dut.name,
164              print_to_console=False)
165      if rv == 0:
166        checksum = checksum.strip()
167        for l in self._sched.get_labels():
168          if l.checksum == checksum:
169            self._logger.LogOutput("Dut '{}' is pre-installed with '{}'".format(
170                self._dut.name, l))
171            self._dut.label = l
172            return
173    except RuntimeError:
174      traceback.print_exc(file=sys.stdout)
175      self._dut.label = None
176
177  def __str__(self):
178    return 'DutWorker[dut="{}", label="{}"]'.format(
179        self._dut.name, self._dut.label.name if self._dut.label else 'None')
180
181  def dut(self):
182    return self._dut
183
184  def status_str(self):
185    """Report thread status."""
186
187    return ('Worker thread "{}", label="{}", benchmark_run={}, '
188            'reimage={}, now {}'.format(
189                self._dut.name, 'None' if self._dut.label is None else
190                self._dut.label.name, self._stat_num_br_run,
191                self._stat_num_reimage, self._stat_annotation))
192
193
194class BenchmarkRunCacheReader(Thread):
195  """The thread to read cache for a list of benchmark_runs.
196
197    On creation, each instance of this class is given a br_list, which is a
198    subset of experiment._benchmark_runs.
199    """
200
201  def __init__(self, schedv2, br_list):
202    super(BenchmarkRunCacheReader, self).__init__()
203    self._schedv2 = schedv2
204    self._br_list = br_list
205    self._logger = self._schedv2.get_logger()
206
207  def run(self):
208    for br in self._br_list:
209      try:
210        br.ReadCache()
211        if br.cache_hit:
212          self._logger.LogOutput('Cache hit - {}'.format(br))
213          with self._schedv2.lock_on('_cached_br_list'):
214            self._schedv2.get_cached_run_list().append(br)
215        else:
216          self._logger.LogOutput('Cache not hit - {}'.format(br))
217      except RuntimeError:
218        traceback.print_exc(file=sys.stderr)
219
220
221class Schedv2(object):
222  """New scheduler for crosperf."""
223
224  def __init__(self, experiment):
225    self._experiment = experiment
226    self._logger = logger.GetLogger(experiment.log_dir)
227
228    # Create shortcuts to nested data structure. "_duts" points to a list of
229    # locked machines. _labels points to a list of all labels.
230    self._duts = self._experiment.machine_manager.GetMachines()
231    self._labels = self._experiment.labels
232
233    # Bookkeeping for synchronization.
234    self._workers_lock = Lock()
235    # pylint: disable=unnecessary-lambda
236    self._lock_map = defaultdict(lambda: Lock())
237
238    # Test mode flag
239    self._in_test_mode = test_flag.GetTestMode()
240
241    # Read benchmarkrun cache.
242    self._read_br_cache()
243
244    # Mapping from label to a list of benchmark_runs.
245    self._label_brl_map = dict((l, []) for l in self._labels)
246    for br in self._experiment.benchmark_runs:
247      assert br.label in self._label_brl_map
248      # Only put no-cache-hit br into the map.
249      if br not in self._cached_br_list:
250        self._label_brl_map[br.label].append(br)
251
252    # Use machine image manager to calculate initial label allocation.
253    self._mim = MachineImageManager(self._labels, self._duts)
254    self._mim.compute_initial_allocation()
255
256    # Create worker thread, 1 per dut.
257    self._active_workers = [DutWorker(dut, self) for dut in self._duts]
258    self._finished_workers = []
259
260    # Termination flag.
261    self._terminated = False
262
263  def run_sched(self):
264    """Start all dut worker threads and return immediately."""
265
266    for w in self._active_workers:
267      w.start()
268
269  def _read_br_cache(self):
270    """Use multi-threading to read cache for all benchmarkruns.
271
272        We do this by firstly creating a few threads, and then assign each
273        thread a segment of all brs. Each thread will check cache status for
274        each br and put those with cache into '_cached_br_list'.
275        """
276
277    self._cached_br_list = []
278    n_benchmarkruns = len(self._experiment.benchmark_runs)
279    if n_benchmarkruns <= 4:
280      # Use single thread to read cache.
281      self._logger.LogOutput(('Starting to read cache status for '
282                              '{} benchmark runs ...').format(n_benchmarkruns))
283      BenchmarkRunCacheReader(self, self._experiment.benchmark_runs).run()
284      return
285
286    # Split benchmarkruns set into segments. Each segment will be handled by
287    # a thread. Note, we use (x+3)/4 to mimic math.ceil(x/4).
288    n_threads = max(2, min(20, (n_benchmarkruns + 3) / 4))
289    self._logger.LogOutput(('Starting {} threads to read cache status for '
290                            '{} benchmark runs ...').format(n_threads,
291                                                            n_benchmarkruns))
292    benchmarkruns_per_thread = (n_benchmarkruns + n_threads - 1) / n_threads
293    benchmarkrun_segments = []
294    for i in range(n_threads - 1):
295      start = i * benchmarkruns_per_thread
296      end = (i + 1) * benchmarkruns_per_thread
297      benchmarkrun_segments.append(self._experiment.benchmark_runs[start:end])
298    benchmarkrun_segments.append(self._experiment.benchmark_runs[
299        (n_threads - 1) * benchmarkruns_per_thread:])
300
301    # Assert: aggregation of benchmarkrun_segments equals to benchmark_runs.
302    assert sum(len(x) for x in benchmarkrun_segments) == n_benchmarkruns
303
304    # Create and start all readers.
305    cache_readers = [
306        BenchmarkRunCacheReader(self, x) for x in benchmarkrun_segments
307    ]
308
309    for x in cache_readers:
310      x.start()
311
312    # Wait till all readers finish.
313    for x in cache_readers:
314      x.join()
315
316    # Summarize.
317    self._logger.LogOutput(
318        'Total {} cache hit out of {} benchmark_runs.'.format(
319            len(self._cached_br_list), n_benchmarkruns))
320
321  def get_cached_run_list(self):
322    return self._cached_br_list
323
324  def get_label_map(self):
325    return self._label_brl_map
326
327  def get_experiment(self):
328    return self._experiment
329
330  def get_labels(self, i=None):
331    if i == None:
332      return self._labels
333    return self._labels[i]
334
335  def get_logger(self):
336    return self._logger
337
338  def get_cached_benchmark_run(self):
339    """Get a benchmark_run with 'cache hit'.
340
341        Returns:
342          The benchmark that has cache hit, if any. Otherwise none.
343        """
344
345    with self.lock_on('_cached_br_list'):
346      if self._cached_br_list:
347        return self._cached_br_list.pop()
348      return None
349
350  def get_benchmark_run(self, dut):
351    """Get a benchmark_run (br) object for a certain dut.
352
353        Args:
354          dut: the dut for which a br is returned.
355
356        Returns:
357          A br with its label matching that of the dut. If no such br could be
358          found, return None (this usually means a reimage is required for the
359          dut).
360        """
361
362    # If terminated, stop providing any br.
363    if self._terminated:
364      return None
365
366    # If dut bears an unrecognized label, return None.
367    if dut.label is None:
368      return None
369
370    # If br list for the dut's label is empty (that means all brs for this
371    # label have been done), return None.
372    with self.lock_on(dut.label):
373      brl = self._label_brl_map[dut.label]
374      if not brl:
375        return None
376      # Return the first br.
377      return brl.pop(0)
378
379  def allocate_label(self, dut):
380    """Allocate a label to a dut.
381
382        The work is delegated to MachineImageManager.
383
384        The dut_worker calling this method is responsible for reimage the dut to
385        this label.
386
387        Args:
388          dut: the new label that is to be reimaged onto the dut.
389
390        Returns:
391          The label or None.
392        """
393
394    if self._terminated:
395      return None
396
397    return self._mim.allocate(dut, self)
398
399  def dut_worker_finished(self, dut_worker):
400    """Notify schedv2 that the dut_worker thread finished.
401
402       Args:
403         dut_worker: the thread that is about to end.
404       """
405
406    self._logger.LogOutput('{} finished.'.format(dut_worker))
407    with self._workers_lock:
408      self._active_workers.remove(dut_worker)
409      self._finished_workers.append(dut_worker)
410
411  def is_complete(self):
412    return len(self._active_workers) == 0
413
414  def lock_on(self, my_object):
415    return self._lock_map[my_object]
416
417  def terminate(self):
418    """Mark flag so we stop providing br/reimages.
419
420        Also terminate each DutWorker, so they refuse to execute br or reimage.
421        """
422
423    self._terminated = True
424    for dut_worker in self._active_workers:
425      dut_worker.terminate()
426
427  def threads_status_as_string(self):
428    """Report the dut worker threads status."""
429
430    status = '{} active threads, {} finished threads.\n'.format(
431        len(self._active_workers), len(self._finished_workers))
432    status += '  Active threads:'
433    for dw in self._active_workers:
434      status += '\n    ' + dw.status_str()
435    if self._finished_workers:
436      status += '\n  Finished threads:'
437      for dw in self._finished_workers:
438        status += '\n    ' + dw.status_str()
439    return status
440