1# Copyright 2018 the V8 project authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from .result import SKIPPED
6
7
8"""
9Pipeline
10
11Test processors are chained together and communicate with each other by
12calling previous/next processor in the chain.
13     ----next_test()---->     ----next_test()---->
14Proc1                    Proc2                    Proc3
15     <---result_for()----     <---result_for()----
16
17For every next_test there is exactly one result_for call.
18If processor ignores the test it has to return SkippedResult.
19If it created multiple subtests for one test and wants to pass all of them to
20the previous processor it can enclose them in GroupedResult.
21
22
23Subtests
24
25When test processor needs to modify the test or create some variants of the
26test it creates subtests and sends them to the next processor.
27Each subtest has:
28- procid - globally unique id that should contain id of the parent test and
29          some suffix given by test processor, e.g. its name + subtest type.
30- processor - which created it
31- origin - pointer to the parent (sub)test
32"""
33
34
35DROP_RESULT = 0
36DROP_OUTPUT = 1
37DROP_PASS_OUTPUT = 2
38DROP_PASS_STDOUT = 3
39
40
41class TestProc(object):
42  def __init__(self):
43    self._prev_proc = None
44    self._next_proc = None
45    self._stopped = False
46    self._requirement = DROP_RESULT
47    self._prev_requirement = None
48    self._reduce_result = lambda result: result
49
50  def connect_to(self, next_proc):
51    """Puts `next_proc` after itself in the chain."""
52    next_proc._prev_proc = self
53    self._next_proc = next_proc
54
55  def remove_from_chain(self):
56    if self._prev_proc:
57      self._prev_proc._next_proc = self._next_proc
58    if self._next_proc:
59      self._next_proc._prev_proc = self._prev_proc
60
61  def setup(self, requirement=DROP_RESULT):
62    """
63    Method called by previous processor or processor pipeline creator to let
64    the processors know what part of the result can be ignored.
65    """
66    self._prev_requirement = requirement
67    if self._next_proc:
68      self._next_proc.setup(max(requirement, self._requirement))
69
70    # Since we're not winning anything by droping part of the result we are
71    # dropping the whole result or pass it as it is. The real reduction happens
72    # during result creation (in the output processor), so the result is
73    # immutable.
74    if (self._prev_requirement < self._requirement and
75        self._prev_requirement == DROP_RESULT):
76      self._reduce_result = lambda _: None
77
78  def next_test(self, test):
79    """
80    Method called by previous processor whenever it produces new test.
81    This method shouldn't be called by anyone except previous processor.
82    """
83    raise NotImplementedError()
84
85  def result_for(self, test, result):
86    """
87    Method called by next processor whenever it has result for some test.
88    This method shouldn't be called by anyone except next processor.
89    """
90    raise NotImplementedError()
91
92  def heartbeat(self):
93    if self._prev_proc:
94      self._prev_proc.heartbeat()
95
96  def stop(self):
97    if not self._stopped:
98      self._stopped = True
99      if self._prev_proc:
100        self._prev_proc.stop()
101      if self._next_proc:
102        self._next_proc.stop()
103
104  @property
105  def is_stopped(self):
106    return self._stopped
107
108  ### Communication
109
110  def _send_test(self, test):
111    """Helper method for sending test to the next processor."""
112    self._next_proc.next_test(test)
113
114  def _send_result(self, test, result):
115    """Helper method for sending result to the previous processor."""
116    if not test.keep_output:
117      result = self._reduce_result(result)
118    self._prev_proc.result_for(test, result)
119
120
121
122class TestProcObserver(TestProc):
123  """Processor used for observing the data."""
124  def __init__(self):
125    super(TestProcObserver, self).__init__()
126
127  def next_test(self, test):
128    self._on_next_test(test)
129    self._send_test(test)
130
131  def result_for(self, test, result):
132    self._on_result_for(test, result)
133    self._send_result(test, result)
134
135  def heartbeat(self):
136    self._on_heartbeat()
137    super(TestProcObserver, self).heartbeat()
138
139  def _on_next_test(self, test):
140    """Method called after receiving test from previous processor but before
141    sending it to the next one."""
142    pass
143
144  def _on_result_for(self, test, result):
145    """Method called after receiving result from next processor but before
146    sending it to the previous one."""
147    pass
148
149  def _on_heartbeat(self):
150    pass
151
152
153class TestProcProducer(TestProc):
154  """Processor for creating subtests."""
155
156  def __init__(self, name):
157    super(TestProcProducer, self).__init__()
158    self._name = name
159
160  def next_test(self, test):
161    self._next_test(test)
162
163  def result_for(self, subtest, result):
164    self._result_for(subtest.origin, subtest, result)
165
166  ### Implementation
167  def _next_test(self, test):
168    raise NotImplementedError()
169
170  def _result_for(self, test, subtest, result):
171    """
172    result_for method extended with `subtest` parameter.
173
174    Args
175      test: test used by current processor to create the subtest.
176      subtest: test for which the `result` is.
177      result: subtest execution result created by the output processor.
178    """
179    raise NotImplementedError()
180
181  ### Managing subtests
182  def _create_subtest(self, test, subtest_id, **kwargs):
183    """Creates subtest with subtest id <processor name>-`subtest_id`."""
184    return test.create_subtest(self, '%s-%s' % (self._name, subtest_id),
185                               **kwargs)
186
187
188class TestProcFilter(TestProc):
189  """Processor for filtering tests."""
190
191  def next_test(self, test):
192    if self._filter(test):
193      self._send_result(test, SKIPPED)
194    else:
195      self._send_test(test)
196
197  def result_for(self, test, result):
198    self._send_result(test, result)
199
200  def _filter(self, test):
201    """Returns whether test should be filtered out."""
202    raise NotImplementedError()
203