1# Copyright 2018 the V8 project authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import collections
6import traceback
7
8from . import base
9from ..local import pool
10
11
12# Global function for multiprocessing, because pickling a static method doesn't
13# work on Windows.
14def run_job(job, process_context):
15  return job.run(process_context)
16
17
18def create_process_context(result_reduction):
19  return ProcessContext(result_reduction)
20
21
22JobResult = collections.namedtuple('JobResult', ['id', 'result'])
23ProcessContext = collections.namedtuple('ProcessContext', ['result_reduction'])
24
25
26class Job(object):
27  def __init__(self, test_id, cmd, outproc, keep_output):
28    self.test_id = test_id
29    self.cmd = cmd
30    self.outproc = outproc
31    self.keep_output = keep_output
32
33  def run(self, process_ctx):
34    output = self.cmd.execute()
35    reduction = process_ctx.result_reduction if not self.keep_output else None
36    result = self.outproc.process(output, reduction)
37    return JobResult(self.test_id, result)
38
39
40class ExecutionProc(base.TestProc):
41  """Last processor in the chain. Instead of passing tests further it creates
42  commands and output processors, executes them in multiple worker processes and
43  sends results to the previous processor.
44  """
45
46  def __init__(self, jobs, outproc_factory=None):
47    super(ExecutionProc, self).__init__()
48    self._pool = pool.Pool(jobs)
49    self._outproc_factory = outproc_factory or (lambda t: t.output_proc)
50    self._tests = {}
51
52  def connect_to(self, next_proc):
53    assert False, 'ExecutionProc cannot be connected to anything'
54
55  def run(self):
56    it = self._pool.imap_unordered(
57        fn=run_job,
58        gen=[],
59        process_context_fn=create_process_context,
60        process_context_args=[self._prev_requirement],
61    )
62    for pool_result in it:
63      self._unpack_result(pool_result)
64
65  def next_test(self, test):
66    if self.is_stopped:
67      return
68
69    test_id = test.procid
70    cmd = test.get_command()
71    self._tests[test_id] = test, cmd
72
73    outproc = self._outproc_factory(test)
74    self._pool.add([Job(test_id, cmd, outproc, test.keep_output)])
75
76  def result_for(self, test, result):
77    assert False, 'ExecutionProc cannot receive results'
78
79  def stop(self):
80    super(ExecutionProc, self).stop()
81    self._pool.abort()
82
83  def _unpack_result(self, pool_result):
84    if pool_result.heartbeat:
85      self.heartbeat()
86      return
87
88    job_result = pool_result.value
89    test_id, result = job_result
90
91    test, result.cmd = self._tests[test_id]
92    del self._tests[test_id]
93    self._send_result(test, result)
94