1#!/usr/bin/env python3
2#
3#   Copyright 2019 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import logging
18
19from acts.controllers.monsoon_lib.sampling.engine.assembly_line import BufferList
20from acts.controllers.monsoon_lib.sampling.engine.assembly_line import BufferStream
21from acts.controllers.monsoon_lib.sampling.engine.assembly_line import DevNullBufferStream
22from acts.controllers.monsoon_lib.sampling.engine.assembly_line import IndexedBuffer
23
24
25class Transformer(object):
26    """An object that represents how to transform a given buffer into a result.
27
28    Attributes:
29        output_stream: The stream to output data to upon transformation.
30            Defaults to a DevNullBufferStream.
31    """
32
33    def __init__(self):
34        self.output_stream = DevNullBufferStream(None)
35
36    def set_output_stream(self, output_stream):
37        """Sets the Transformer's output stream to the given output stream."""
38        self.output_stream = output_stream
39
40    def transform(self, input_stream):
41        """Transforms input_stream data and passes it to self.output_stream.
42
43        Args:
44            input_stream: The BufferStream of input data this transformer should
45                transform. Note that the type of data stored within BufferStream
46                is not guaranteed to be in the format expected, much like STDIN
47                is not guaranteed to be the format a process expects. However,
48                for performance, users should expect the data to be properly
49                formatted anyway.
50        """
51        input_stream.initialize()
52        self.output_stream.initialize()
53        class_name = self.__class__.__qualname__
54        try:
55            logging.debug('%s transformer beginning.', class_name)
56            self.on_begin()
57            logging.debug('%s transformation started.', class_name)
58            self._transform(input_stream)
59        except Exception:
60            # TODO(markdr): Get multi-process error reporting to play nicer.
61            logging.exception('%s ran into an exception.', class_name)
62            raise
63        finally:
64            logging.debug('%s transformation ended.', class_name)
65            self.on_end()
66            logging.debug('%s finished.', class_name)
67
68    def _transform_buffer(self, buffer):
69        """Transforms a given buffer.
70
71        The implementation can either:
72
73        1) Return the transformed buffer. Can be either in-place or a new
74           buffer.
75
76        2) Return a BufferList: a list of transformed buffers. This is useful
77           for grouping data together for faster operations.
78
79        Args:
80            buffer: The buffer to transform
81
82        Returns:
83            either a buffer or a BufferList. See detailed documentation.
84        """
85        raise NotImplementedError()
86
87    def _on_end_of_stream(self, input_stream):
88        """To be called when the input stream has sent the end of stream signal.
89
90        This is particularly useful for flushing any stored memory into the
91        output stream.
92
93        Args:
94            input_stream: the stream that was closed.
95        """
96        # By default, this function closes the output stream.
97        self.output_stream.end_stream()
98
99    def _transform(self, input_stream):
100        """Should call _transform_buffer within this function."""
101        raise NotImplementedError()
102
103    def on_begin(self):
104        """A function called before the transform loop begins."""
105        pass
106
107    def on_end(self):
108        """A function called after the transform loop has ended."""
109        pass
110
111
112class SourceTransformer(Transformer):
113    """The base class for generating data in an AssemblyLine.
114
115    Note that any Transformer will be able to generate data, but this class is
116    a generic way to send data.
117
118    Attributes:
119        _buffer_size: The buffer size for each IndexedBuffer sent over the
120            output stream.
121    """
122
123    def __init__(self):
124        super().__init__()
125        # Defaulted to 64, which is small enough to be passed within the .6ms
126        # window, but large enough so that it does not spam the queue.
127        self._buffer_size = 64
128
129    def _transform(self, _):
130        """Generates data and sends it to the output stream."""
131        buffer_index = 0
132        while True:
133            indexed_buffer = IndexedBuffer(buffer_index, self._buffer_size)
134            buffer = self._transform_buffer(indexed_buffer.buffer)
135            if buffer is BufferStream.END:
136                break
137            indexed_buffer.buffer = buffer
138            self.output_stream.add_indexed_buffer(indexed_buffer)
139            buffer_index += 1
140
141        self.output_stream.end_stream()
142
143    def _transform_buffer(self, buffer):
144        """Fills the passed-in buffer with data."""
145        raise NotImplementedError()
146
147
148class SequentialTransformer(Transformer):
149    """A transformer that receives input in sequential order.
150
151    Attributes:
152        _next_index: The index of the next IndexedBuffer that should be read.
153    """
154
155    def __init__(self):
156        super().__init__()
157        self._next_index = 0
158
159    def _transform(self, input_stream):
160        while True:
161            indexed_buffer = input_stream.remove_indexed_buffer()
162            if indexed_buffer is BufferStream.END:
163                break
164            buffer_or_buffers = self._transform_buffer(indexed_buffer.buffer)
165            if buffer_or_buffers is not None:
166                self._send_buffers(buffer_or_buffers)
167
168        self._on_end_of_stream(input_stream)
169
170    def _send_buffers(self, buffer_or_buffer_list):
171        """Sends buffers over to the output_stream.
172
173        Args:
174            buffer_or_buffer_list: A BufferList or buffer object. Note that if
175                buffer is None, it is effectively an end-of-stream signal.
176        """
177        if not isinstance(buffer_or_buffer_list, BufferList):
178            # Assume a single buffer was returned
179            buffer_or_buffer_list = BufferList([buffer_or_buffer_list])
180
181        buffer_list = buffer_or_buffer_list
182        for buffer in buffer_list:
183            new_buffer = IndexedBuffer(self._next_index, buffer)
184            self.output_stream.add_indexed_buffer(new_buffer)
185            self._next_index += 1
186
187    def _transform_buffer(self, buffer):
188        raise NotImplementedError()
189
190
191class ParallelTransformer(Transformer):
192    """A Transformer that is capable of running in parallel.
193
194    Buffers received may be unordered. For ordered input, use
195    SequentialTransformer.
196    """
197
198    def _transform(self, input_stream):
199        while True:
200            indexed_buffer = input_stream.remove_indexed_buffer()
201            if indexed_buffer is None:
202                break
203            buffer = self._transform_buffer(indexed_buffer.buffer)
204            indexed_buffer.buffer = buffer
205            self.output_stream.add_indexed_buffer(indexed_buffer)
206
207        self._on_end_of_stream(input_stream)
208
209    def _transform_buffer(self, buffer):
210        """Transforms a given buffer.
211
212        Note that ParallelTransformers can NOT return a BufferList. This is a
213        limitation with the current indexing system. If the input buffer is
214        replaced with multiple buffers, later transformers will not know what
215        the proper order of buffers is.
216
217        Args:
218            buffer: The buffer to transform
219
220        Returns:
221            either None or a buffer. See detailed documentation.
222        """
223        raise NotImplementedError()
224