1 /* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/profiler/convert/xplane_to_op_metrics_db.h"
17 
18 #include <algorithm>
19 #include <memory>
20 #include <vector>
21 
22 #include "absl/algorithm/container.h"
23 #include "absl/container/flat_hash_map.h"
24 #include "absl/strings/str_cat.h"
25 #include "absl/strings/string_view.h"
26 #include "absl/types/optional.h"
27 #include "tensorflow/core/lib/gtl/map_util.h"
28 #include "tensorflow/core/platform/logging.h"
29 #include "tensorflow/core/platform/types.h"
30 #include "tensorflow/core/profiler/convert/op_metrics_db_combiner.h"
31 #include "tensorflow/core/profiler/convert/op_stack.h"
32 #include "tensorflow/core/profiler/protobuf/op_metrics.pb.h"
33 #include "tensorflow/core/profiler/protobuf/xplane.pb.h"
34 #include "tensorflow/core/profiler/utils/cost_utils.h"
35 #include "tensorflow/core/profiler/utils/op_metrics_db_utils.h"
36 #include "tensorflow/core/profiler/utils/op_utils.h"
37 #include "tensorflow/core/profiler/utils/tf_op_utils.h"
38 #include "tensorflow/core/profiler/utils/tf_xplane_visitor.h"
39 #include "tensorflow/core/profiler/utils/timespan.h"
40 #include "tensorflow/core/profiler/utils/trace_utils.h"
41 #include "tensorflow/core/profiler/utils/xplane_schema.h"
42 #include "tensorflow/core/profiler/utils/xplane_visitor.h"
43 
44 namespace tensorflow {
45 namespace profiler {
46 namespace {
47 
48 // Type of a TensorFlow Op activity, which is either beginning or ending an Op.
49 enum TfActivityType { kTfOpBegin, kTfOpEnd };
50 
51 // Instant activity representing the begin or end of a host-side TF Op.
52 struct TfActivity {
53   // The timestamp in picoseconds when this activity happened.
54   uint64 timestamp_ps;
55   // The ID of this Op.
56   uint32 tf_op_id;
57   // Type of this activity.
58   TfActivityType activity_type;
59   // Full TF op name and type of this activity (backed by XEvent::name).
60   TfOp tf_op;
61   // Whether it is eagerly executed.
62   bool is_eager;
63 };
64 
65 // TF Op metrics stored as element in OpStack.
66 struct TfOpInfo {
TfOpInfotensorflow::profiler::__anon3ba6d7a60111::TfOpInfo67   explicit TfOpInfo(uint64 ts) : start_timestamp_ps(ts) {}
68 
69   // Start timestamp in picoseconds.
70   uint64 start_timestamp_ps;
71   // Children duration in picoseconds.
72   uint64 children_duration_ps = 0;
73 };
74 
75 // Processes a TF-activity on particular core.
ProcessOneTfActivity(const TfActivity & activity,OpStack<TfOpInfo> * tf_op_stack,TfMetricsDbData * tf_metrics_data)76 void ProcessOneTfActivity(const TfActivity& activity,
77                           OpStack<TfOpInfo>* tf_op_stack,
78                           TfMetricsDbData* tf_metrics_data) {
79   uint32 tf_op_id = activity.tf_op_id;
80   switch (activity.activity_type) {
81     case kTfOpBegin: {
82       tf_op_stack->Push(tf_op_id,
83                         absl::make_unique<TfOpInfo>(activity.timestamp_ps));
84       break;
85     }
86     case kTfOpEnd: {
87       std::unique_ptr<TfOpInfo> info = tf_op_stack->Pop(tf_op_id);
88       if (info == nullptr) {
89         // This happens if TraceMes overlap.
90         VLOG(1) << "No begin event found for TF activity id=" << tf_op_id
91                 << " name=" << activity.tf_op.name
92                 << " type=" << activity.tf_op.type;
93         break;
94       }
95       Timespan tf_op_span =
96           PicoSpan(info->start_timestamp_ps, activity.timestamp_ps);
97       tf_metrics_data->tf_metrics_db_builder.EnterOp(
98           activity.tf_op.name, activity.tf_op.type, activity.is_eager,
99           tf_op_span.duration_ps(), info->children_duration_ps);
100       TfOpInfo* parent_info = tf_op_stack->Top();
101       if (parent_info != nullptr) {
102         parent_info->children_duration_ps += tf_op_span.duration_ps();
103       }
104       if (IsInfeedEnqueueOp(activity.tf_op.type)) {
105         if (tf_metrics_data->last_infeed_enq_duration_ps > 0) {
106           DCHECK(tf_metrics_data->last_infeed_enq_start_timestamp_ps <=
107                  info->start_timestamp_ps);
108           uint64 start_timestamps_ps_diff =
109               info->start_timestamp_ps -
110               tf_metrics_data->last_infeed_enq_start_timestamp_ps;
111           tf_metrics_data->tf_metrics_db_builder.UpdateHostInfeedEnqInfo(
112               tf_metrics_data->last_infeed_enq_duration_ps,
113               start_timestamps_ps_diff);
114         }
115         tf_metrics_data->last_infeed_enq_start_timestamp_ps =
116             info->start_timestamp_ps;
117         tf_metrics_data->last_infeed_enq_duration_ps = tf_op_span.duration_ps();
118       }
119       break;
120     }
121   }
122 }
123 
124 // Processes all TF-activities on the given core.
ProcessTfActivities(std::vector<TfActivity> * tf_activities,TfMetricsDbData * tf_metrics_db_data)125 void ProcessTfActivities(std::vector<TfActivity>* tf_activities,
126                          TfMetricsDbData* tf_metrics_db_data) {
127   if (tf_activities->empty()) return;
128   absl::c_stable_sort(*tf_activities,
129                       [](const TfActivity& a, const TfActivity& b) {
130                         return a.timestamp_ps < b.timestamp_ps;
131                       });
132   OpStack<TfOpInfo> tf_op_stack;
133   for (const auto& tf_activity : *tf_activities) {
134     ProcessOneTfActivity(tf_activity, &tf_op_stack, tf_metrics_db_data);
135   }
136   tf_metrics_db_data->tf_metrics_db.set_total_time_ps(
137       tf_activities->back().timestamp_ps - tf_activities->front().timestamp_ps);
138 }
139 
CollectTfActivities(const XLineVisitor & line,const absl::flat_hash_map<int64,TfOp> & tf_ops,std::vector<TfActivity> * tf_activities)140 void CollectTfActivities(const XLineVisitor& line,
141                          const absl::flat_hash_map<int64, TfOp>& tf_ops,
142                          std::vector<TfActivity>* tf_activities) {
143   uint32 tf_op_id = 0;
144   tf_activities->reserve(line.NumEvents() * 2);
145   line.ForEachEvent([&tf_ops, &tf_op_id,
146                      &tf_activities](const XEventVisitor& event) {
147     const TfOp* tf_op = gtl::FindOrNull(tf_ops, event.Id());
148     if (tf_op != nullptr) {
149       ++tf_op_id;
150       bool is_eager = false;
151       if (absl::optional<XStatVisitor> stat =
152               event.GetStat(StatType::kIsEager)) {
153         is_eager = stat->IntValue();
154       }
155       Timespan span = event.GetTimespan();
156       tf_activities->push_back(
157           {span.begin_ps(), tf_op_id, kTfOpBegin, *tf_op, is_eager});
158       tf_activities->push_back(
159           {span.end_ps(), tf_op_id, kTfOpEnd, *tf_op, is_eager});
160     }
161   });
162 }
163 
164 }  // namespace
165 
CollectTfOpsFromHostThreadsXPlane(const XPlane & host_trace)166 absl::flat_hash_map<int64, TfOp> CollectTfOpsFromHostThreadsXPlane(
167     const XPlane& host_trace) {
168   absl::flat_hash_map<int64, TfOp> tf_ops;
169   for (const auto& id_metadata : host_trace.event_metadata()) {
170     const XEventMetadata& metadata = id_metadata.second;
171     // On the host, we have added some user-specified TraceMe's in addition to
172     // the TraceMe's added to every TensorFlow op by the system. These
173     // user-inserted TraceMe's have "unknown" type. We don't count them in
174     // Tf-stats.
175     TfOp tf_op = ParseTfOpFullname(metadata.name());
176     if (tf_op.category != Category::kUnknown) {
177       tf_ops.try_emplace(metadata.id(), tf_op);
178     }
179   }
180   return tf_ops;
181 }
182 
ConvertHostThreadsXLineToTfMetricsDbData(const XLineVisitor & line,const absl::flat_hash_map<int64,TfOp> & tf_ops)183 TfMetricsDbData ConvertHostThreadsXLineToTfMetricsDbData(
184     const XLineVisitor& line, const absl::flat_hash_map<int64, TfOp>& tf_ops) {
185   TfMetricsDbData tf_metrics_db_data;
186   if (!tf_ops.empty()) {
187     std::vector<TfActivity> tf_activities;
188     CollectTfActivities(line, tf_ops, &tf_activities);
189     ProcessTfActivities(&tf_activities, &tf_metrics_db_data);
190   }
191   return tf_metrics_db_data;
192 }
193 
ConsumeTfMetricsDbData(TfMetricsDbData src,OpMetricsDbCombiner * dst)194 void ConsumeTfMetricsDbData(TfMetricsDbData src, OpMetricsDbCombiner* dst) {
195   AddIdleOp(&src.tf_metrics_db);
196   dst->Combine(src.tf_metrics_db);
197   src.tf_metrics_db.Clear();
198 }
199 
ConvertHostThreadsXPlaneToOpMetricsDb(const XPlane & host_trace)200 OpMetricsDb ConvertHostThreadsXPlaneToOpMetricsDb(const XPlane& host_trace) {
201   absl::flat_hash_map<int64, TfOp> tf_ops =
202       CollectTfOpsFromHostThreadsXPlane(host_trace);
203   OpMetricsDb result;
204   OpMetricsDbCombiner combiner(&result);
205   XPlaneVisitor plane = CreateTfXPlaneVisitor(&host_trace);
206   plane.ForEachLine([&tf_ops, &combiner](const XLineVisitor& line) {
207     ConsumeTfMetricsDbData(
208         ConvertHostThreadsXLineToTfMetricsDbData(line, tf_ops), &combiner);
209   });
210   return result;
211 }
212 
ConvertDeviceTraceXPlaneToOpMetricsDb(const XPlane & device_trace)213 OpMetricsDb ConvertDeviceTraceXPlaneToOpMetricsDb(const XPlane& device_trace) {
214   OpMetricsDb result;
215   DeviceOpMetricsDbBuilder device_op_metrics_db_builder(&result);
216 
217   int64 first_op_offset_ps = kint64max;
218   int64 last_op_offset_ps = 0;
219 
220   TfOpRoofLineCostEstimator op_level_cost_estimator;
221   XPlaneVisitor plane = CreateTfXPlaneVisitor(&device_trace);
222   plane.ForEachLine([&](const XLineVisitor& line) {
223     if (IsDerivedThreadId(line.Id())) return;
224     line.ForEachEvent([&](const XEventVisitor& event) {
225       first_op_offset_ps = std::min(first_op_offset_ps, event.OffsetPs());
226       last_op_offset_ps = std::max(last_op_offset_ps, event.EndOffsetPs());
227 
228       absl::string_view tf_op_full_name;
229       bool is_eager;
230       event.ForEachStat([&](const XStatVisitor& stat) {
231         if (stat.Type() == StatType::kLevel0 ||  // old way to deliver tf_op.
232             stat.Type() == StatType::kTfOp) {
233           tf_op_full_name = stat.StrOrRefValue();
234         } else if (stat.Type() == StatType::kIsEager) {
235           is_eager = stat.IntValue();
236         }
237       });
238       if (tf_op_full_name.empty()) return;
239       TfOp tf_op = ParseTfOpFullname(tf_op_full_name);
240       TfOpRoofLineCostEstimator::OpRoofLineStats costs;
241       if (tf_op.category != Category::kUnknown) {
242         costs = op_level_cost_estimator.Predict(event);
243       }
244       device_op_metrics_db_builder.EnterOp(
245           /*program_id=*/0, absl::StrCat(tf_op.name, "/", event.Name()),
246           tf_op.type, tf_op_full_name, is_eager,
247           /*occurrences=*/1, event.DurationPs(),
248           /*children_time_ps=*/0, costs.flops, costs.bytes_accessed);
249     });
250   });
251   result.set_total_time_ps(last_op_offset_ps - first_op_offset_ps);
252   AddIdleOp(&result);
253   return result;
254 }
255 
256 }  // namespace profiler
257 }  // namespace tensorflow
258