1 /* Copyright 2018 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 #ifndef TENSORFLOW_CORE_FRAMEWORK_MODEL_H_
16 #define TENSORFLOW_CORE_FRAMEWORK_MODEL_H_
17 
18 #include <list>
19 #include <memory>
20 #include <string>
21 // TODO(b/114492873): Move this include into core/platform.
22 #include <thread>  // NOLINT
23 #include <utility>
24 #include <vector>
25 
26 #include "tensorflow/core/framework/types.h"
27 #include "tensorflow/core/lib/gtl/cleanup.h"
28 #include "tensorflow/core/lib/gtl/map_util.h"
29 #include "tensorflow/core/lib/random/random.h"
30 #include "tensorflow/core/platform/cpu_info.h"
31 #include "tensorflow/core/platform/env.h"
32 
33 namespace tensorflow {
34 namespace data {
35 namespace model {
36 
37 // A constant that can be used to enable auto-tuning.
38 constexpr int kAutoTune = -1;
39 
40 // Represents thread-safe state that can be shared between an input pipeline and
41 // the performance model.
42 struct SharedState {
43  public:
SharedStateSharedState44   SharedState(int64 value, std::shared_ptr<mutex> mu,
45               std::shared_ptr<condition_variable> cond_var)
46       : value(value),
47         mu(std::move(mu)),
48         cond_var(std::move(cond_var)),
49         tunable(value == kAutoTune) {}
50 
51   int64 value;
52   std::shared_ptr<mutex> mu;
53   std::shared_ptr<condition_variable> cond_var;
54   const bool tunable;
55 };
56 
57 // Represents a parameter.
58 struct Parameter {
ParameterParameter59   Parameter(const string& name, std::shared_ptr<SharedState> state, int64 min,
60             int64 max)
61       : name(name),
62         value(state->value),
63         min(min),
64         max(max),
65         state(std::move(state)) {}
66 
67   // Human-readable name of the parameter.
68   string name;
69 
70   // Identifies the model value of the parameter. This can be different from
71   // the actual value (e.g. during optimization search).
72   int64 value;
73 
74   // Identifies the minimum value of the parameter.
75   int64 min;
76 
77   // Identifies the maximum value of the parameter.
78   int64 max;
79 
80   // Shared state of the parameter.
81   std::shared_ptr<SharedState> state;
82 };
83 
84 std::shared_ptr<Parameter> MakeParameter(const string& name,
85                                          std::shared_ptr<SharedState> state,
86                                          int64 min, int64 max);
87 
88 // Abstract representation of a TensorFlow input pipeline node. It collects
89 // information about inputs to this node, processing time spent executing the
90 // node logic, number of elements produced by the node, various other
91 // information (e.g. batch size or execution parallelism).
92 //
93 // Developers of tf.data transformations are not expected to interact with
94 // this class directly. Boiler plate code for creating the abstract
95 // representation of the input pipeline and collecting common information has
96 // been added to the implementation of `DatasetBase` and `DatasetBaseIterator`
97 // respectively.
98 //
99 // In addition, `DatasetBaseIterator` provides wrappers that can be used for
100 // transformation-specific information collection. The `SetMetadata` wrapper
101 // can be used to pass arbitrary metadata to the modeling framework, while the
102 // `StartWork` and `StopWork` wrappers should be used to correctly account for
103 // processing time of multi-threaded transformation that yield the CPU; such
104 // transformations should invoke `StartWork()` when a transformation thread
105 // starts executing (e.g. when created or woken up) and `StopWork()` when a
106 // transformation thread stops executing (e.g. when returning or waiting).
107 class Node {
108  public:
109   // Arguments for `Node` constructor.
110   struct Args {
111     int64 id;
112     string name;
113     std::shared_ptr<Node> output;
114   };
115 
116   using Factory = std::function<std::shared_ptr<Node>(Args)>;
117 
Node(Args args)118   explicit Node(Args args)
119       : id_(args.id), name_(args.name), output_(args.output.get()) {}
120 
121   // Increments the bytes buffered by the given delta.
add_buffered_bytes(int64 delta)122   void add_buffered_bytes(int64 delta) LOCKS_EXCLUDED(mu_) {
123     mutex_lock l(mu_);
124     buffered_bytes_ += delta;
125   }
126 
127   // Adds an input.
add_input(std::shared_ptr<Node> node)128   void add_input(std::shared_ptr<Node> node) LOCKS_EXCLUDED(mu_) {
129     mutex_lock l(mu_);
130     inputs_.push_back(node);
131   }
132 
133   // Increments the aggregate processing time by the given delta.
add_processing_time(int64 delta)134   void add_processing_time(int64 delta) LOCKS_EXCLUDED(mu_) {
135     mutex_lock l(mu_);
136     processing_time_ += delta;
137   }
138 
139   // Returns the number of bytes stored in this node's buffer.
buffered_bytes()140   int64 buffered_bytes() const LOCKS_EXCLUDED(mu_) {
141     tf_shared_lock l(mu_);
142     return buffered_bytes_;
143   }
144 
145   // Indicates whether the node has tunable parameters.
has_tunable_parameters()146   bool has_tunable_parameters() const LOCKS_EXCLUDED(mu_) {
147     tf_shared_lock l(mu_);
148     for (const auto& pair : parameters_) {
149       if (pair.second->state->tunable) return true;
150     }
151     return false;
152   }
153 
154   // Returns the unique node ID.
id()155   int64 id() const LOCKS_EXCLUDED(mu_) { return id_; }
156 
157   // Returns the node inputs.
inputs()158   std::list<std::shared_ptr<Node>> inputs() const LOCKS_EXCLUDED(mu_) {
159     tf_shared_lock l(mu_);
160     return inputs_;
161   }
162 
163   // Returns the node name.
name()164   const string& name() const { return name_; }
165 
166   // Returns the number of elements produced by the node.
num_elements()167   int64 num_elements() const LOCKS_EXCLUDED(mu_) {
168     tf_shared_lock l(mu_);
169     return num_elements_;
170   }
171 
172   // Returns the node output.
output()173   Node* output() const { return output_; }
174 
175   // Returns the aggregate processing time.
processing_time()176   int64 processing_time() const LOCKS_EXCLUDED(mu_) {
177     tf_shared_lock l(mu_);
178     return processing_time_;
179   }
180 
181   // Records that the node produced an element.
record_element()182   void record_element() LOCKS_EXCLUDED(mu_) {
183     mutex_lock l(mu_);
184     num_elements_++;
185   }
186 
187   // Records that a node thread has started executing.
record_start(int64 time_nanos)188   void record_start(int64 time_nanos) LOCKS_EXCLUDED(mu_) {
189     mutex_lock l(mu_);
190     work_start_[std::this_thread::get_id()] = time_nanos;
191   }
192 
193   // Records that a node thread has stopped executing.
record_stop(int64 time_nanos)194   void record_stop(int64 time_nanos) LOCKS_EXCLUDED(mu_) {
195     mutex_lock l(mu_);
196     std::thread::id tid = std::this_thread::get_id();
197     auto iter = work_start_.find(tid);
198     if (iter != work_start_.end()) {
199       processing_time_ += time_nanos - iter->second;
200       work_start_.erase(iter);
201     } else {
202       LOG(WARNING)
203           << "Encountered a stop event that was not preceded by a start event.";
204     }
205   }
206 
207   // Removes an input.
remove_input(std::shared_ptr<Node> input)208   void remove_input(std::shared_ptr<Node> input) LOCKS_EXCLUDED(mu_) {
209     mutex_lock l(mu_);
210     inputs_.remove(input);
211   }
212 
213   // Collects tunable parameters in the subtree rooted in this node.
CollectTunableParameters(std::vector<std::shared_ptr<Parameter>> * parameters)214   void CollectTunableParameters(
215       std::vector<std::shared_ptr<Parameter>>* parameters) const
216       LOCKS_EXCLUDED(mu_) {
217     tf_shared_lock l(mu_);
218     for (auto& pair : parameters_) {
219       if (pair.second->state->tunable) {
220         parameters->push_back(pair.second);
221       }
222     }
223     for (auto& input : inputs_) {
224       input->CollectTunableParameters(parameters);
225     }
226   }
227 
228   // Returns the per-element output time for this node.
OutputTime(std::vector<int64> * input_times)229   int64 OutputTime(std::vector<int64>* input_times) const LOCKS_EXCLUDED(mu_) {
230     tf_shared_lock l(mu_);
231     return OutputTimeLocked(input_times);
232   }
233 
234   // Returns the per-element processing time spent in the subtree rooted in
235   // this node.
ProcessingTime()236   int64 ProcessingTime() const LOCKS_EXCLUDED(mu_) {
237     tf_shared_lock l(mu_);
238     return ProcessingTimeLocked();
239   }
240 
241   // Returns a copy of this node, making a deep copy of its inputs and a
242   // shallow copy of its tunable parameters.
243   //
244   // The purpose for this method is to allow the model optimization logic to
245   // operate over immutable state while allowing concurrent model updates.
Snapshot(std::shared_ptr<Node> output)246   std::shared_ptr<Node> Snapshot(std::shared_ptr<Node> output)
247       LOCKS_EXCLUDED(mu_) {
248     tf_shared_lock l(mu_);
249     std::shared_ptr<Node> result = Clone(output);
250     {
251       mutex_lock l2(result->mu_);
252       result->buffered_bytes_ = buffered_bytes_;
253       result->processing_time_ = processing_time_;
254       result->num_elements_ = num_elements_;
255       result->parameters_ = parameters_;
256     }
257     for (auto& input : inputs_) {
258       result->add_input(input->Snapshot(result));
259     }
260     return result;
261   }
262 
263  protected:
264   // Creates a clone of this node.
265   virtual std::shared_ptr<Node> Clone(std::shared_ptr<Node> output) const
266       SHARED_LOCKS_REQUIRED(mu_) = 0;
267 
268   // Returns the per-element processing time spent in this node.
NanosPerElementLocked()269   int64 NanosPerElementLocked() const SHARED_LOCKS_REQUIRED(mu_) {
270     if (num_elements_ == 0) {
271       return 0;
272     }
273     return static_cast<int64>(static_cast<double>(processing_time_) /
274                               static_cast<double>(num_elements_));
275   }
276 
277   // Returns the sum of per-element output time for the inputs of this node.
OutputTimeForInputs(std::vector<int64> * input_times)278   int64 OutputTimeForInputs(std::vector<int64>* input_times) const
279       SHARED_LOCKS_REQUIRED(mu_) {
280     int64 sum = 0;
281     for (auto& input : inputs_) {
282       sum += input->OutputTime(input_times);
283     }
284     return sum;
285   }
286 
287   // Returns the per-element output time for this node.
288   virtual int64 OutputTimeLocked(std::vector<int64>* input_times) const
289       SHARED_LOCKS_REQUIRED(mu_) = 0;
290 
291   // Returns the sum of per-element processing time for the inputs of this node.
292   //
293   // TODO(jsimsa): use processing time history as a prior for future inputs
ProcessingTimeForInputs()294   int64 ProcessingTimeForInputs() const SHARED_LOCKS_REQUIRED(mu_) {
295     int64 sum = 0;
296     for (auto& input : inputs_) {
297       sum += input->ProcessingTime();
298     }
299     return sum;
300   }
301 
302   // Returns the per-element processing time spent in the subtree rooted in
303   // this node.
304   virtual int64 ProcessingTimeLocked() const SHARED_LOCKS_REQUIRED(mu_) = 0;
305 
306   mutable mutex mu_;
307   const int64 id_;
308   const string name_;
309   int64 buffered_bytes_ GUARDED_BY(mu_) = 0;
310   int64 processing_time_ GUARDED_BY(mu_) = 0;
311   int64 num_elements_ GUARDED_BY(mu_) = 0;
312   std::map<std::thread::id, int64> work_start_ GUARDED_BY(mu_);
313   std::map<string, std::shared_ptr<Parameter>> parameters_ GUARDED_BY(mu_);
314   std::list<std::shared_ptr<Node>> inputs_ GUARDED_BY(mu_);
315 
316   // The reference to the output node is not owned so that deletion of a
317   // node results in recursive deletion of the subtree rooted in the node.
318   Node* const output_;
319 };
320 
321 // InterleaveMany is used to model datasets whose inputs are used to create
322 // datasets whose elements are then interleaved.
323 std::shared_ptr<Node> MakeInterleaveManyNode(Node::Args args);
324 
325 // AsyncInterleaveMany nodes are the asynchronous version of InterleaveMany
326 // nodes.
327 std::shared_ptr<Node> MakeAsyncInterleaveManyNode(
328     Node::Args args, std::vector<std::shared_ptr<Parameter>> parameters);
329 
330 // KnownMany nodes model datasets that synchronously consume known number of
331 // input element per output element.
332 std::shared_ptr<Node> MakeKnownRatioNode(Node::Args args, double ratio);
333 
334 // AsyncKnownRatio nodes are the asynchronous version of KnownRate nodes.
335 std::shared_ptr<Node> MakeAsyncKnownRatioNode(
336     Node::Args args, double ratio,
337     std::vector<std::shared_ptr<Parameter>> parameters);
338 
339 // Source nodes represent data sources.
340 std::shared_ptr<Node> MakeSourceNode(Node::Args args);
341 
342 // UnknownMany nodes represent datasets that synchronously consume an
343 // unknown number of input elements per output.
344 //
345 // Unlike KnownRatio nodes which expect the ratio between inputs and outputs is
346 // specified as a parameter, UnknownRatio estimates the ratio empirically.
347 std::shared_ptr<Node> MakeUnknownRatioNode(Node::Args args);
348 
349 // Unknown nodes represent datasets for which we do not have a model. It acts
350 // as pass-through between inputs and output.
351 std::shared_ptr<Node> MakeUnknownNode(Node::Args args);
352 
353 // Abstract representation of a TensorFlow input pipeline that can be used
354 // for collecting runtime information and optimizing performance. It collects
355 // runtime information about execution of the input pipeline that is used to
356 // create a performance model, which is in turn used to identify optimal values
357 // of tunable parameters.
358 //
359 // Developers of tf.data transformations are not expected to interact with this
360 // class directly. Boiler plate code for creating the abstract representation of
361 // the input pipeline and collecting runtime information has been added to the
362 // implementation of `DatasetBase` and `DatasetBaseIterator` respectively.
363 class Model {
364  public:
365   using NodeHook = std::function<void(std::shared_ptr<Node>)>;
366 
367   // Creates a new model.
368   //
369   // The `remove_node_hook` argument can be used to specify functionality that
370   // should be invoked before a node is removed from the model. The hook can be
371   // used for dependency injection -- to allow the model to invoke functionality
372   // from modules that it could not depend on statically.
Model(NodeHook remove_node_hook)373   Model(NodeHook remove_node_hook)
374       : collect_resource_usage_(false),
375         remove_node_hook_(std::move(remove_node_hook)) {
376     DCHECK(remove_node_hook_ != nullptr);
377   }
378 
379   // Indicates whether to collect resource usage.
collect_resource_usage()380   bool collect_resource_usage() const { return collect_resource_usage_; }
381 
382   // Adds a node with the given name and given output.
383   std::shared_ptr<Node> AddNode(Node::Factory factory, const string& name,
384                                 const string& output_name) LOCKS_EXCLUDED(mu_);
385 
386   // Increments the processing time for the given node..
387   void AddProcessingTime(const string& name, int64 delta) LOCKS_EXCLUDED(mu_);
388 
389   // Runs optimization.
390   void Optimize(int64 cpu_budget) LOCKS_EXCLUDED(mu_);
391 
392   // Records that a node has produced an element.
393   void RecordElement(const string& name) LOCKS_EXCLUDED(mu_);
394 
395   // Records that the given node has started work. If `stop_output` is set, it
396   // also records that the output of the given node has stopped work.
397   void RecordStart(const string& name, bool stop_output) LOCKS_EXCLUDED(mu_);
398 
399   // Records that the given node has stopped work. If `stop_output` is set, it
400   // also records that the output of the given node has started work.
401   void RecordStop(const string& name, bool start_output) LOCKS_EXCLUDED(mu_);
402 
403   // Removes the given node.
404   void RemoveNode(const string& name) LOCKS_EXCLUDED(mu_);
405 
406  private:
407   // Collects tunable parameters in the tree rooted in the given node.
408   std::vector<std::shared_ptr<Parameter>> CollectTunableParameters(
409       std::shared_ptr<Node> node);
410 
411   // Collects the output time for the given node.
412   int64 OutputTime(std::shared_ptr<Node> node);
413 
414   // Collects the processing time for the given node.
415   int64 ProcessingTime(std::shared_ptr<Node> node);
416 
417   // Used for coordination between different input pipeline threads. Exclusive
418   // access is required only when adding or removing nodes. Concurrent access to
419   // existing nodes is protected by a node mutex.
420   mutex mu_;
421   int64 id_counter_ GUARDED_BY(mu_) = 1;
422   std::shared_ptr<Node> output_ GUARDED_BY(mu_);
423   std::map<string, std::shared_ptr<Node>> lookup_table_ GUARDED_BY(mu_);
424 
425   // Indicates whether the modeling framework should collect resource usage
426   // (e.g. CPU, memory). The logic for collecting this information assumes that
427   // the collection is not repeatedly disabled and enabled. As a consequence,
428   // the implementation starts collecting resource usage when it encounters a
429   // tunable parameter (because the information is used for for tuning the value
430   // of the parameter) and never stops.
431   std::atomic<bool> collect_resource_usage_;
432 
433   // A hook invoked immediately before a node is removed from the model.
434   const NodeHook remove_node_hook_;
435 };
436 
437 }  // namespace model
438 }  // namespace data
439 }  // namespace tensorflow
440 
441 #endif  // TENSORFLOW_CORE_FRAMEWORK_MODEL_H_
442