1 /*
2  * Copyright 2015 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * FlowGraph.h
19  *
20  * Processing node and ports that can be used in a simple data flow graph.
21  * This was designed to work with audio but could be used for other
22  * types of data.
23  */
24 
25 #ifndef FLOWGRAPH_FLOW_GRAPH_NODE_H
26 #define FLOWGRAPH_FLOW_GRAPH_NODE_H
27 
28 #include <cassert>
29 #include <cstring>
30 #include <math.h>
31 #include <memory>
32 #include <sys/types.h>
33 #include <time.h>
34 #include <unistd.h>
35 #include <vector>
36 
37 // TODO Move these classes into separate files.
38 // TODO Review use of raw pointers for connect(). Maybe use smart pointers but need to avoid
39 //      run-time deallocation in audio thread.
40 
41 // Set this to 1 if using it inside the Android framework.
42 // This code is kept here so that it can be moved easily between Oboe and AAudio.
43 #ifndef FLOWGRAPH_ANDROID_INTERNAL
44 #define FLOWGRAPH_ANDROID_INTERNAL 0
45 #endif
46 
47 // Set this to a name that will prevent AAudio from calling into Oboe.
48 // AAudio and Oboe both use a version of this flowgraph package.
49 // There was a problem in the unit tests where AAudio would call a constructor
50 // in AAudio and then call a destructor in Oboe! That caused memory corruption.
51 // For more details, see Issue #930.
52 #ifndef FLOWGRAPH_OUTER_NAMESPACE
53 #define FLOWGRAPH_OUTER_NAMESPACE oboe
54 #endif
55 
56 namespace FLOWGRAPH_OUTER_NAMESPACE {
57 namespace flowgraph {
58 
59 // Default block size that can be overridden when the FlowGraphPortFloat is created.
60 // If it is too small then we will have too much overhead from switching between nodes.
61 // If it is too high then we will thrash the caches.
62 constexpr int kDefaultBufferSize = 8; // arbitrary
63 
64 class FlowGraphPort;
65 class FlowGraphPortFloatInput;
66 
67 /***************************************************************************/
68 /**
69  * Base class for all nodes in the flowgraph.
70  */
71 class FlowGraphNode {
72 public:
FlowGraphNode()73     FlowGraphNode() {}
74     virtual ~FlowGraphNode() = default;
75 
76     /**
77      * Read from the input ports,
78      * generate multiple frames of data then write the results to the output ports.
79      *
80      * @param numFrames maximum number of frames requested for processing
81      * @return number of frames actually processed
82      */
83     virtual int32_t onProcess(int32_t numFrames) = 0;
84 
85     /**
86      * If the callCount is at or after the previous callCount then call
87      * pullData on all of the upstreamNodes.
88      * Then call onProcess().
89      * This prevents infinite recursion in case of cyclic graphs.
90      * It also prevents nodes upstream from a branch from being executed twice.
91      *
92      * @param callCount
93      * @param numFrames
94      * @return number of frames valid
95      */
96     int32_t pullData(int32_t numFrames, int64_t callCount);
97 
98     /**
99      * Recursively reset all the nodes in the graph, starting from a Sink.
100      *
101      * This must not be called at the same time as pullData!
102      */
103     void pullReset();
104 
105     /**
106      * Reset framePosition counters.
107      */
108     virtual void reset();
109 
addInputPort(FlowGraphPort & port)110     void addInputPort(FlowGraphPort &port) {
111         mInputPorts.push_back(port);
112     }
113 
isDataPulledAutomatically()114     bool isDataPulledAutomatically() const {
115         return mDataPulledAutomatically;
116     }
117 
118     /**
119      * Set true if you want the data pulled through the graph automatically.
120      * This is the default.
121      *
122      * Set false if you want to pull the data from the input ports in the onProcess() method.
123      * You might do this, for example, in a sample rate converting node.
124      *
125      * @param automatic
126      */
setDataPulledAutomatically(bool automatic)127     void setDataPulledAutomatically(bool automatic) {
128         mDataPulledAutomatically = automatic;
129     }
130 
getName()131     virtual const char *getName() {
132         return "FlowGraph";
133     }
134 
getLastCallCount()135     int64_t getLastCallCount() {
136         return mLastCallCount;
137     }
138 
139 protected:
140 
141     static constexpr int64_t  kInitialCallCount = -1;
142     int64_t  mLastCallCount = kInitialCallCount;
143 
144     std::vector<std::reference_wrapper<FlowGraphPort>> mInputPorts;
145 
146 private:
147     bool     mDataPulledAutomatically = true;
148     bool     mBlockRecursion = false;
149     int32_t  mLastFrameCount = 0;
150 
151 };
152 
153 /***************************************************************************/
154 /**
155   * This is a connector that allows data to flow between modules.
156   *
157   * The ports are the primary means of interacting with a module.
158   * So they are generally declared as public.
159   *
160   */
161 class FlowGraphPort {
162 public:
FlowGraphPort(FlowGraphNode & parent,int32_t samplesPerFrame)163     FlowGraphPort(FlowGraphNode &parent, int32_t samplesPerFrame)
164             : mContainingNode(parent)
165             , mSamplesPerFrame(samplesPerFrame) {
166     }
167 
168     virtual ~FlowGraphPort() = default;
169 
170     // Ports are often declared public. So let's make them non-copyable.
171     FlowGraphPort(const FlowGraphPort&) = delete;
172     FlowGraphPort& operator=(const FlowGraphPort&) = delete;
173 
getSamplesPerFrame()174     int32_t getSamplesPerFrame() const {
175         return mSamplesPerFrame;
176     }
177 
178     virtual int32_t pullData(int64_t framePosition, int32_t numFrames) = 0;
179 
pullReset()180     virtual void pullReset() {}
181 
182 protected:
183     FlowGraphNode &mContainingNode;
184 
185 private:
186     const int32_t    mSamplesPerFrame = 1;
187 };
188 
189 /***************************************************************************/
190 /**
191  * This port contains a 32-bit float buffer that can contain several frames of data.
192  * Processing the data in a block improves performance.
193  *
194  * The size is framesPerBuffer * samplesPerFrame).
195  */
196 class FlowGraphPortFloat  : public FlowGraphPort {
197 public:
198     FlowGraphPortFloat(FlowGraphNode &parent,
199                    int32_t samplesPerFrame,
200                    int32_t framesPerBuffer = kDefaultBufferSize
201                 );
202 
203     virtual ~FlowGraphPortFloat() = default;
204 
getFramesPerBuffer()205     int32_t getFramesPerBuffer() const {
206         return mFramesPerBuffer;
207     }
208 
209 protected:
210 
211     /**
212      * @return buffer internal to the port or from a connected port
213      */
getBuffer()214     virtual float *getBuffer() {
215         return mBuffer.get();
216     }
217 
218 private:
219     const int32_t    mFramesPerBuffer = 1;
220     std::unique_ptr<float[]> mBuffer; // allocated in constructor
221 };
222 
223 /***************************************************************************/
224 /**
225   * The results of a node's processing are stored in the buffers of the output ports.
226   */
227 class FlowGraphPortFloatOutput : public FlowGraphPortFloat {
228 public:
FlowGraphPortFloatOutput(FlowGraphNode & parent,int32_t samplesPerFrame)229     FlowGraphPortFloatOutput(FlowGraphNode &parent, int32_t samplesPerFrame)
230             : FlowGraphPortFloat(parent, samplesPerFrame) {
231     }
232 
233     virtual ~FlowGraphPortFloatOutput() = default;
234 
235     using FlowGraphPortFloat::getBuffer;
236 
237     /**
238      * Connect to the input of another module.
239      * An input port can only have one connection.
240      * An output port can have multiple connections.
241      * If you connect a second output port to an input port
242      * then it overwrites the previous connection.
243      *
244      * This not thread safe. Do not modify the graph topology from another thread while running.
245      * Also do not delete a module while it is connected to another port if the graph is running.
246      */
247     void connect(FlowGraphPortFloatInput *port);
248 
249     /**
250      * Disconnect from the input of another module.
251      * This not thread safe.
252      */
253     void disconnect(FlowGraphPortFloatInput *port);
254 
255     /**
256      * Call the parent module's onProcess() method.
257      * That may pull data from its inputs and recursively
258      * process the entire graph.
259      * @return number of frames actually pulled
260      */
261     int32_t pullData(int64_t framePosition, int32_t numFrames) override;
262 
263 
264     void pullReset() override;
265 
266 };
267 
268 /***************************************************************************/
269 
270 /**
271  * An input port for streaming audio data.
272  * You can set a value that will be used for processing.
273  * If you connect an output port to this port then its value will be used instead.
274  */
275 class FlowGraphPortFloatInput : public FlowGraphPortFloat {
276 public:
FlowGraphPortFloatInput(FlowGraphNode & parent,int32_t samplesPerFrame)277     FlowGraphPortFloatInput(FlowGraphNode &parent, int32_t samplesPerFrame)
278             : FlowGraphPortFloat(parent, samplesPerFrame) {
279         // Add to parent so it can pull data from each input.
280         parent.addInputPort(*this);
281     }
282 
283     virtual ~FlowGraphPortFloatInput() = default;
284 
285     /**
286      * If connected to an output port then this will return
287      * that output ports buffers.
288      * If not connected then it returns the input ports own buffer
289      * which can be loaded using setValue().
290      */
291     float *getBuffer() override;
292 
293     /**
294      * Write every value of the float buffer.
295      * This value will be ignored if an output port is connected
296      * to this port.
297      */
setValue(float value)298     void setValue(float value) {
299         int numFloats = kDefaultBufferSize * getSamplesPerFrame();
300         float *buffer = getBuffer();
301         for (int i = 0; i < numFloats; i++) {
302             *buffer++ = value;
303         }
304     }
305 
306     /**
307      * Connect to the output of another module.
308      * An input port can only have one connection.
309      * An output port can have multiple connections.
310      * This not thread safe.
311      */
connect(FlowGraphPortFloatOutput * port)312     void connect(FlowGraphPortFloatOutput *port) {
313         assert(getSamplesPerFrame() == port->getSamplesPerFrame());
314         mConnected = port;
315     }
316 
disconnect(FlowGraphPortFloatOutput * port)317     void disconnect(FlowGraphPortFloatOutput *port) {
318         assert(mConnected == port);
319         (void) port;
320         mConnected = nullptr;
321     }
322 
disconnect()323     void disconnect() {
324         mConnected = nullptr;
325     }
326 
327     /**
328      * Pull data from any output port that is connected.
329      */
330     int32_t pullData(int64_t framePosition, int32_t numFrames) override;
331 
332     void pullReset() override;
333 
334 private:
335     FlowGraphPortFloatOutput *mConnected = nullptr;
336 };
337 
338 /***************************************************************************/
339 
340 /**
341  * Base class for an edge node in a graph that has no upstream nodes.
342  * It outputs data but does not consume data.
343  * By default, it will read its data from an external buffer.
344  */
345 class FlowGraphSource : public FlowGraphNode {
346 public:
FlowGraphSource(int32_t channelCount)347     explicit FlowGraphSource(int32_t channelCount)
348             : output(*this, channelCount) {
349     }
350 
351     virtual ~FlowGraphSource() = default;
352 
353     FlowGraphPortFloatOutput output;
354 };
355 
356 /***************************************************************************/
357 
358 /**
359  * Base class for an edge node in a graph that has no upstream nodes.
360  * It outputs data but does not consume data.
361  * By default, it will read its data from an external buffer.
362  */
363 class FlowGraphSourceBuffered : public FlowGraphSource {
364 public:
FlowGraphSourceBuffered(int32_t channelCount)365     explicit FlowGraphSourceBuffered(int32_t channelCount)
366             : FlowGraphSource(channelCount) {}
367 
368     virtual ~FlowGraphSourceBuffered() = default;
369 
370     /**
371      * Specify buffer that the node will read from.
372      *
373      * @param data TODO Consider using std::shared_ptr.
374      * @param numFrames
375      */
setData(const void * data,int32_t numFrames)376     void setData(const void *data, int32_t numFrames) {
377         mData = data;
378         mSizeInFrames = numFrames;
379         mFrameIndex = 0;
380     }
381 
382 protected:
383     const void *mData = nullptr;
384     int32_t     mSizeInFrames = 0; // number of frames in mData
385     int32_t     mFrameIndex = 0; // index of next frame to be processed
386 };
387 
388 /***************************************************************************/
389 /**
390  * Base class for an edge node in a graph that has no downstream nodes.
391  * It consumes data but does not output data.
392  * This graph will be executed when data is read() from this node
393  * by pulling data from upstream nodes.
394  */
395 class FlowGraphSink : public FlowGraphNode {
396 public:
FlowGraphSink(int32_t channelCount)397     explicit FlowGraphSink(int32_t channelCount)
398             : input(*this, channelCount) {
399     }
400 
401     virtual ~FlowGraphSink() = default;
402 
403     FlowGraphPortFloatInput input;
404 
405     /**
406      * Dummy processor. The work happens in the read() method.
407      *
408      * @param numFrames
409      * @return number of frames actually processed
410      */
onProcess(int32_t numFrames)411     int32_t onProcess(int32_t numFrames) override {
412         return numFrames;
413     }
414 
415     virtual int32_t read(void *data, int32_t numFrames) = 0;
416 
417 protected:
418     /**
419      * Pull data through the graph using this nodes last callCount.
420      * @param numFrames
421      * @return
422      */
423     int32_t pullData(int32_t numFrames);
424 };
425 
426 /***************************************************************************/
427 /**
428  * Base class for a node that has an input and an output with the same number of channels.
429  * This may include traditional filters, eg. FIR, but also include
430  * any processing node that converts input to output.
431  */
432 class FlowGraphFilter : public FlowGraphNode {
433 public:
FlowGraphFilter(int32_t channelCount)434     explicit FlowGraphFilter(int32_t channelCount)
435             : input(*this, channelCount)
436             , output(*this, channelCount) {
437     }
438 
439     virtual ~FlowGraphFilter() = default;
440 
441     FlowGraphPortFloatInput input;
442     FlowGraphPortFloatOutput output;
443 };
444 
445 } /* namespace flowgraph */
446 } /* namespace FLOWGRAPH_OUTER_NAMESPACE */
447 
448 #endif /* FLOWGRAPH_FLOW_GRAPH_NODE_H */
449