1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 #ifndef TENSORFLOW_CORE_COMMON_RUNTIME_STEP_STATS_COLLECTOR_H_
16 #define TENSORFLOW_CORE_COMMON_RUNTIME_STEP_STATS_COLLECTOR_H_
17 
18 #include <memory>
19 #include <unordered_map>
20 #include <vector>
21 #include "tensorflow/core/framework/step_stats.pb.h"
22 #include "tensorflow/core/lib/gtl/inlined_vector.h"
23 #include "tensorflow/core/platform/env.h"
24 #include "tensorflow/core/platform/mutex.h"
25 #include "tensorflow/core/platform/thread_annotations.h"
26 #include "tensorflow/core/platform/types.h"
27 
28 namespace tensorflow {
29 
30 class Allocator;
31 class AllocatorMemoryUsed;
32 class CostModelManager;
33 class Graph;
34 class NodeDef;
35 class NodeExecStats;
36 class OpKernelContext;
37 class StepStats;
38 class StepStatsCollector;
39 class Tensor;
40 class TrackingAllocator;
41 
42 // Statistics collection interface for individual node execution.
43 //
44 // See `NodeExecStatsWrapper` for a concrete implementation of this interface
45 // that interfaces with the `Session` layer.
46 class NodeExecStatsInterface {
47  public:
~NodeExecStatsInterface()48   virtual ~NodeExecStatsInterface() {}
49 
50   // Called when the statistics collection for the node has finished. Once this
51   // method is called, the caller should not make assumptions about the validity
52   // of this object.
53   virtual void Done(const string& device) = 0;
54 
55   // Called immediately after this node starts being processed by the executor.
56   virtual void RecordExecutorStarted() = 0;
57 
58   // Called immediately before this node's `Compute()` or `ComputeAsync()`
59   // method is called.
60   virtual void RecordComputeStarted() = 0;
61 
62   // Called immediately after this node's `Compute()` method returned (or, for
63   // asynchronous operations, the callback passed to its `ComputeAsync()` method
64   // was called).
65   virtual void RecordComputeEnded() = 0;
66 
67   // Called immediately after this executor finishes processing this node.
68   virtual void RecordExecutorEnded() = 0;
69 
70   // Returns `true` if this object should track memory allocations.
71   virtual bool TrackAllocations() const = 0;
72 
73   // Records information about the memory allocated during the execution of this
74   // node.
75   //
76   // Takes ownership of any `TrackingAllocator` objects stored in `ctx`.
77   virtual void SetMemory(OpKernelContext* ctx) = 0;
78 
79   // Records information about the tensor produced by this node at the given
80   // output slot.
81   virtual void SetOutput(int slot, const Tensor* tensor) = 0;
82 
83   // Records the absolute time in nanoseconds at which this node became
84   // runnable (i.e. was scheduled for execution).
85   virtual void SetScheduled(int64 nanos) = 0;
86 };
87 
88 // Wraps NodeExecStats and adds allocation to it.
89 class NodeExecStatsWrapper : public NodeExecStatsInterface {
90  public:
91   // Does not take ownership of `node` or `step_stats_collector`.
92   NodeExecStatsWrapper(const NodeDef* node,
93                        StepStatsCollector* step_stats_collector);
94 
95   // Takes ownership of 'stats' but not `node` or `step_stats_collector`.
96   NodeExecStatsWrapper(std::unique_ptr<NodeExecStats> stats,
97                        const NodeDef* node,
98                        StepStatsCollector* step_stats_collector);
99 
100   // Destructor calls Finalize() to release the TrackingAllocators.
~NodeExecStatsWrapper()101   ~NodeExecStatsWrapper() override { Finalize(); }
102 
103   void Done(const string& device) override;
104   void RecordExecutorStarted() override;
105   void RecordComputeStarted() override;
106   void RecordComputeEnded() override;
107   void RecordExecutorEnded() override;
TrackAllocations()108   bool TrackAllocations() const override { return true; }
109   void SetMemory(OpKernelContext* ctx) override;
110   void SetOutput(int slot, const Tensor* tensor) override;
111   void SetScheduled(int64 nanos) override;
112 
113  private:
114   friend class StepStatsCollector;
115 
stats()116   NodeExecStats* stats() { return stats_.get(); }
117 
118   // Populates stats_ and releases TrackingAllocator.
119   void Finalize();
120 
121   // Does not take ownership of the `allocator`.
122   // Takes ownership of `tracking_allocator`.
123   void AddAllocation(Allocator* allocator,
124                      TrackingAllocator* tracking_allocator);
125 
126   gtl::InlinedVector<std::pair<AllocatorMemoryUsed*, TrackingAllocator*>, 2>
127       allocations_;
128   std::unique_ptr<NodeExecStats> stats_;
129   const NodeDef* const node_;                       // Not owned.
130   StepStatsCollector* const step_stats_collector_;  // Not owned.
131 };
132 
133 // Statistics collection interface for step execution.
134 //
135 // See `StepStatsCollector` for a concrete implementation of this interface
136 // that interfaces with the `Session` layer.
137 class StepStatsCollectorInterface {
138  public:
~StepStatsCollectorInterface()139   virtual ~StepStatsCollectorInterface() {}
140 
141   // Creates an instance of `NodeExecStatsInterface` that should be used for
142   // collecting statistics about individual node execution.
143   virtual NodeExecStatsInterface* CreateNodeExecStats(const NodeDef* node) = 0;
144 
145   // Generates a string reporting the currently used memory based
146   // on ResourceExhausted OOM `err` message.
147   // `err` message needs to contain device name and allocator name, e.g.:
148   // "ResourceExhaustedError: OOM when allocating tensor ...
149   // on /job:localhost/replica:0/task:0/device:GPU:0 by allocator GPU_0_bfc"
150   virtual string ReportAllocsOnResourceExhausted(const string& err) = 0;
151 };
152 
153 // StepStatsCollector manages the collection of a StepStats object.
154 // The StepStats object holds multiple DeviceStats.
155 // Each DeviceStats object holds multiple NodeExecStats.
156 class StepStatsCollector : public StepStatsCollectorInterface {
157  public:
158   // Does not take ownership of `step_stats`.
159   explicit StepStatsCollector(StepStats* step_stats);
160 
161   // BuildCostModel builds or updates a CostModel managed by cost_model_manager,
162   // using the currently collected DeviceStats associated with the devices in
163   // device_map.
164   void BuildCostModel(
165       CostModelManager* cost_model_manager,
166       const std::unordered_map<string, const Graph*>& device_map);
167 
168   // Saves node statistics to the DeviceStats object associated with device.
169   // Should be called before Finalize.
170   void Save(const string& device, NodeExecStats* node_stats_pb);
171   void Save(const string& device, NodeExecStatsWrapper* node_stats);
172 
173   // Saves thread name.
174   void SaveThreadName(const string& device, const uint32 thread_id,
175                       const string& thread_name);
176 
177   NodeExecStatsInterface* CreateNodeExecStats(const NodeDef* node) override;
178   string ReportAllocsOnResourceExhausted(const string& err) override;
179 
180   // The following 2 Finalize methods populate the StepStats passed
181   // from the constructor. Calling it more than once won't have any effect.
182   // User shouldn't call Save() methods after Finalize.
183   void Finalize();
184   // swaps the content of StepStats* from constructor with 'ss'.
185   void FinalizeAndSwap(StepStats* step_stats);
186 
187  private:
188   // TODO(suharshs): Make this configurable if its not possible to find a value
189   // that works for all cases.
190   static constexpr uint64 kMaxCollectedNodes = 1 << 20;
191 
192   typedef std::vector<std::unique_ptr<NodeExecStatsWrapper>> NodeStatsVector;
193   typedef std::unordered_map<uint32, string> ThreadNamesMap;
194 
195   void FinalizeInternal() TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
196 
197   mutex mu_;
198   bool finalized_ TF_GUARDED_BY(mu_);
199   std::unordered_map<string, NodeStatsVector> dev_stats_ TF_GUARDED_BY(mu_);
200   std::unordered_map<string, ThreadNamesMap> thread_names_ TF_GUARDED_BY(mu_);
201   StepStats* step_stats_ TF_GUARDED_BY(mu_);
202   uint64 collected_nodes_ TF_GUARDED_BY(mu_) = 0;
203 };
204 
205 }  // namespace tensorflow
206 
207 #endif  // TENSORFLOW_CORE_COMMON_RUNTIME_STEP_STATS_COLLECTOR_H_
208