1#!/usr/bin/env python3
2#
3#   Copyright 2019 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import queue
18from concurrent.futures import ThreadPoolExecutor
19import multiprocessing
20
21
22class AssemblyLine(object):
23    """A class for passing data through a chain of threads or processes,
24    assembly-line style.
25
26    Attributes:
27        nodes: A list of AssemblyLine.Nodes that pass data from one node to the
28            next.
29    """
30
31    class Node(object):
32        """A Node in an AssemblyLine.
33
34        Each node is composed of the following:
35
36         input_stream                    output_stream
37        ==============> [ transformer ] ===============>
38
39        Attributes:
40            transformer: The Transformer that takes input from the input
41                stream, transforms the data, and sends it to the output stream.
42            input_stream: The stream of data to be taken in as input to this
43                transformer. This stream is the stream to be registered as the
44                previous node's output stream.
45
46        Properties:
47            output_stream: The stream of data to be passed to the next node.
48        """
49
50        def __init__(self, transformer=None, input_stream=None):
51            self.transformer = transformer
52            self.input_stream = input_stream
53
54        @property
55        def output_stream(self):
56            return self.transformer.output_stream
57
58        @output_stream.setter
59        def output_stream(self, value):
60            self.transformer.output_stream = value
61
62    def __init__(self, nodes):
63        """Initializes an AssemblyLine class.
64
65        nodes:
66            A list of AssemblyLine.Node objects.
67        """
68        self.nodes = nodes
69
70    def run(self):
71        """Runs the AssemblyLine, passing the data between each work node."""
72        raise NotImplementedError()
73
74
75class ProcessAssemblyLine(AssemblyLine):
76    """An AssemblyLine that uses processes to schedule work on nodes."""
77
78    def run(self):
79        """Runs the AssemblyLine within a process pool."""
80        if not self.nodes:
81            # If self.nodes is empty, it will create a multiprocessing.Pool of
82            # 0 nodes, which raises a ValueError.
83            return
84
85        process_pool = multiprocessing.Pool(processes=len(self.nodes))
86        for node in self.nodes:
87            process_pool.apply_async(node.transformer.transform,
88                                     [node.input_stream])
89        process_pool.close()
90        process_pool.join()
91
92
93class ThreadAssemblyLine(AssemblyLine):
94    """An AssemblyLine that uses threading to schedule work on nodes."""
95
96    def run(self):
97        """Runs the AssemblyLine within a thread pool."""
98        with ThreadPoolExecutor(max_workers=len(self.nodes)) as thread_pool:
99            for node in self.nodes:
100                thread_pool.submit(node.transformer.transform,
101                                   node.input_stream)
102
103
104class AssemblyLineBuilder(object):
105    """An abstract class that builds an AssemblyLine object.
106
107    Attributes:
108    _assembly_line_generator: The callable that creates the AssemblyLine.
109        Should be in the form of:
110
111            Args:
112                A list of AssemblyLine.Node objects.
113
114            Returns:
115                An AssemblyLine object.
116
117    _queue_generator: The callable that creates new queues to be used for
118        BufferStreams. Should be in the form of:
119
120            Args:
121                None.
122
123            Returns:
124                A Queue object.
125    """
126
127    def __init__(self, queue_generator, assembly_line_generator):
128        """Creates an AssemblyLineBuilder.
129
130        Args:
131            queue_generator: A callable of type lambda: Queue().
132            assembly_line_generator: A callable of type
133                lambda list<AssemblyLine.Node>: AssemblyLine.
134        """
135        super().__init__()
136        self._assembly_line_generator = assembly_line_generator
137        self._queue_generator = queue_generator
138
139        self.nodes = []
140        self._built = False
141
142    @property
143    def built(self):
144        return self._built
145
146    def __generate_queue(self):
147        """Returns a new Queue object for passing information between nodes."""
148        return self._queue_generator()
149
150    @property
151    def queue_generator(self):
152        """Returns the callable used for generating queues."""
153        return self._queue_generator
154
155    def source(self, transformer, input_stream=None):
156        """Adds a SourceTransformer to the AssemblyLine.
157
158        Must be the first function call on the AssemblyLineBuilder.
159
160        Args:
161            transformer: The SourceTransformer that generates data for the
162                AssemblyLine to process.
163            input_stream: The input stream to use, if necessary.
164
165        Raises:
166            ValueError if source is not the first transformer to be added to
167                the AssemblyLine, or the AssemblyLine has been built.
168        """
169        if self.nodes:
170            raise ValueError('AssemblyLines can only have a single source.')
171        if input_stream is None:
172            input_stream = DevNullBufferStream()
173        self.nodes.append(AssemblyLine.Node(transformer, input_stream))
174        return self
175
176    def into(self, transformer):
177        """Adds the given transformer next in the AssemblyLine.
178
179        Args:
180            transformer: The transformer next in the AssemblyLine.
181
182        Raises:
183            ValueError if no source node is set, or the AssemblyLine has been
184                built.
185        """
186        if not self.nodes:
187            raise ValueError('The source transformer must be set first.')
188        if self.built:
189            raise ValueError('Cannot add additional nodes after the '
190                             'AssemblyLine has been built.')
191        stream = BufferStream(self.__generate_queue())
192        self.nodes[-1].transformer.set_output_stream(stream)
193        self.nodes.append(AssemblyLine.Node(transformer, stream))
194        return self
195
196    def build(self, output_stream=None):
197        """Builds the AssemblyLine object.
198
199        Note that after this function is called this AssemblyLineBuilder cannot
200        be used again, as it is already marked as built.
201        """
202        if self.built:
203            raise ValueError('The AssemblyLine is already built.')
204        if not self.nodes:
205            raise ValueError('Cannot create an empty assembly line.')
206        self._built = True
207        if output_stream is None:
208            output_stream = DevNullBufferStream()
209        self.nodes[-1].output_stream = output_stream
210        return self._assembly_line_generator(self.nodes)
211
212
213class ThreadAssemblyLineBuilder(AssemblyLineBuilder):
214    """An AssemblyLineBuilder for generating ThreadAssemblyLines."""
215
216    def __init__(self, queue_generator=queue.Queue):
217        super().__init__(queue_generator, ThreadAssemblyLine)
218
219
220class ProcessAssemblyLineBuilder(AssemblyLineBuilder):
221    """An AssemblyLineBuilder for ProcessAssemblyLines.
222
223    Attributes:
224        manager: The multiprocessing.Manager used for having queues communicate
225            with one another over multiple processes.
226    """
227
228    def __init__(self):
229        self.manager = multiprocessing.Manager()
230        super().__init__(self.manager.Queue, ProcessAssemblyLine)
231
232
233class IndexedBuffer(object):
234    """A buffer indexed with the order it was generated in."""
235
236    def __init__(self, index, size_or_buffer):
237        """Creates an IndexedBuffer.
238
239        Args:
240            index: The integer index associated with the buffer.
241            size_or_buffer:
242                either:
243                    An integer specifying the number of slots in the buffer OR
244                    A list to be used as a buffer.
245        """
246        self.index = index
247        if isinstance(size_or_buffer, int):
248            self.buffer = [None] * size_or_buffer
249        else:
250            self.buffer = size_or_buffer
251
252
253class BufferList(list):
254    """A list of Buffers.
255
256    This type is useful for differentiating when a buffer has been returned
257    from a transformer, vs when a list of buffers has been returned from a
258    transformer.
259    """
260
261
262class BufferStream(object):
263    """An object that acts as a stream between two transformers."""
264
265    # The object passed to the buffer queue to signal the end-of-stream.
266    END = None
267
268    def __init__(self, buffer_queue):
269        """Creates a new BufferStream.
270
271        Args:
272            buffer_queue: A Queue object used to pass data along the
273                BufferStream.
274        """
275        self._buffer_queue = buffer_queue
276
277    def initialize(self):
278        """Initializes the stream.
279
280        When running BufferStreams through multiprocessing, initialize must
281        only be called on the process using the BufferStream.
282        """
283        # Here we need to make any call to the stream to initialize it. This
284        # makes read and write times for the first buffer faster, preventing
285        # the data at the beginning from being dropped.
286        self._buffer_queue.qsize()
287
288    def end_stream(self):
289        """Closes the stream.
290
291        By convention, a None object is used, mirroring file reads returning
292        an empty string when the end of file is reached.
293        """
294        self._buffer_queue.put(None, block=False)
295
296    def add_indexed_buffer(self, buffer):
297        """Adds the given buffer to the buffer stream."""
298        self._buffer_queue.put(buffer, block=False)
299
300    def remove_indexed_buffer(self):
301        """Removes an indexed buffer from the array.
302
303        This operation blocks until data is received.
304
305        Returns:
306            an IndexedBuffer.
307        """
308        return self._buffer_queue.get()
309
310
311class DevNullBufferStream(BufferStream):
312    """A BufferStream that is always empty."""
313
314    def __init__(self, *_):
315        super().__init__(None)
316
317    def initialize(self):
318        """Does nothing. Nothing to initialize."""
319        pass
320
321    def end_stream(self):
322        """Does nothing. The stream always returns end-of-stream when read."""
323        pass
324
325    def add_indexed_buffer(self, buffer):
326        """Imitating /dev/null, nothing will be written to the stream."""
327        pass
328
329    def remove_indexed_buffer(self):
330        """Always returns the end-of-stream marker."""
331        return None
332