1 /* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/profiler/convert/op_stats_combiner.h"
17
18 #include "absl/container/flat_hash_map.h"
19 #include "tensorflow/core/platform/logging.h"
20 #include "tensorflow/core/platform/macros.h"
21 #include "tensorflow/core/platform/protobuf.h"
22 #include "tensorflow/core/profiler/convert/op_metrics_db_combiner.h"
23 #include "tensorflow/core/profiler/convert/xplane_to_tf_functions.h"
24 #include "tensorflow/core/profiler/protobuf/diagnostics.pb.h"
25 #include "tensorflow/core/profiler/protobuf/hardware_types.pb.h"
26 #include "tensorflow/core/profiler/protobuf/kernel_stats.pb.h"
27 #include "tensorflow/core/profiler/protobuf/op_stats.pb.h"
28 #include "tensorflow/core/profiler/protobuf/steps_db.pb.h"
29 #include "tensorflow/core/profiler/utils/hardware_type_utils.h"
30 #include "tensorflow/core/profiler/utils/kernel_stats_utils.h"
31 #include "tensorflow/core/profiler/utils/step_intersection.h"
32
33 namespace tensorflow {
34 namespace profiler {
35
36 namespace {
37
38 // Combines the src PerCoreStepInfo into the dst PerCoreStepInfo.
CombinePerCoreStepInfo(int src_host_id,const PerCoreStepInfo & src,bool use_incomplete_step,PerCoreStepInfo * dst,OpMetricsDbCombiner * hlo_metrics_db_complete_steps_only_combiner,OpMetricsDbCombiner * hlo_metrics_db_per_step_combiner)39 void CombinePerCoreStepInfo(
40 int src_host_id, const PerCoreStepInfo& src, bool use_incomplete_step,
41 PerCoreStepInfo* dst,
42 OpMetricsDbCombiner* hlo_metrics_db_complete_steps_only_combiner,
43 OpMetricsDbCombiner* hlo_metrics_db_per_step_combiner) {
44 CombineCoreIdMap(src_host_id, src.step_info_per_core(),
45 dst->mutable_step_info_per_core());
46
47 // Since we have assigned a new step number to the combined result, update
48 // the step number on each core to this new step number.
49 uint32 new_step_num = dst->step_num();
50 for (auto& percore_stepinfo : *dst->mutable_step_info_per_core()) {
51 auto& stepinfo = percore_stepinfo.second;
52 stepinfo.set_step_num(new_step_num);
53 }
54
55 if (!use_incomplete_step) {
56 hlo_metrics_db_complete_steps_only_combiner->Combine(src.hlo_metrics_db());
57 }
58 hlo_metrics_db_per_step_combiner->Combine(src.hlo_metrics_db());
59 CombineCoreIdMap(src_host_id, src.flow_db_per_core(),
60 dst->mutable_flow_db_per_core());
61 CombineCoreIdMap(src_host_id, src.all_reduce_db_per_core(),
62 dst->mutable_all_reduce_db_per_core());
63 CombineCoreIdMap(src_host_id, src.core_id_to_replica_id_map(),
64 dst->mutable_core_id_to_replica_id_map());
65 }
66
CombineStepDatabase(int src_host_id,const StepIntersection & step_intersection,const StepDatabaseResult & src,StepDatabaseResult * dst,OpMetricsDbCombiner * hlo_metrics_db_complete_steps_only_combiner,std::vector<OpMetricsDbCombiner> * hlo_metrics_db_per_step_combiners)67 void CombineStepDatabase(
68 int src_host_id, const StepIntersection& step_intersection,
69 const StepDatabaseResult& src, StepDatabaseResult* dst,
70 OpMetricsDbCombiner* hlo_metrics_db_complete_steps_only_combiner,
71 std::vector<OpMetricsDbCombiner>* hlo_metrics_db_per_step_combiners) {
72 if (src.use_incomplete_step()) dst->set_use_incomplete_step(true);
73 uint32 src_first_step_idx = step_intersection.FirstStepIndex(src_host_id);
74 for (uint32 i = 0; i < step_intersection.NumSteps(); i++) {
75 CombinePerCoreStepInfo(
76 src_host_id, src.step_sequence(src_first_step_idx + i),
77 src.use_incomplete_step(), dst->mutable_step_sequence(i),
78 hlo_metrics_db_complete_steps_only_combiner,
79 &(*hlo_metrics_db_per_step_combiners)[i]);
80 }
81 }
82
CombineRunEnvironment(const RunEnvironment & src,RunEnvironment * dst)83 void CombineRunEnvironment(const RunEnvironment& src, RunEnvironment* dst) {
84 dst->mutable_hostnames()->insert(src.hostnames().begin(),
85 src.hostnames().end());
86 dst->set_host_count(dst->hostnames_size());
87 if (src.device_type() != "CPU") {
88 dst->set_device_type(src.device_type());
89 // TODO(b/111402648): Batch size may differ per-core. Currently, we report
90 // the max batch size. We need to come up with a better measure.
91 dst->set_per_core_batch_size(
92 std::max(src.per_core_batch_size(), dst->per_core_batch_size()));
93 dst->set_device_core_count(src.device_core_count() +
94 dst->device_core_count());
95 // Replica count and num cores per replica must be same for all copies.
96 dst->set_replica_count(std::max(src.replica_count(), dst->replica_count()));
97 dst->set_num_cores_per_replica(
98 std::max(src.num_cores_per_replica(), dst->num_cores_per_replica()));
99 *dst->mutable_topology() = src.topology();
100 } else if (dst->device_type().empty()) {
101 dst->set_device_type(src.device_type());
102 }
103 dst->set_task_count(src.task_count() + dst->task_count());
104 (*dst->mutable_host_independent_job_info()) = src.host_independent_job_info();
105 for (const auto& job_info : src.host_dependent_job_info()) {
106 *(dst->add_host_dependent_job_info()) = job_info;
107 }
108 dst->set_host_trace_level(src.host_trace_level());
109 }
110
111 // Combines the src PerfEnv into the dst PerfEnv.
CombinePerfEnv(const PerfEnv & src,PerfEnv * dst)112 void CombinePerfEnv(const PerfEnv& src, PerfEnv* dst) {
113 dst->set_peak_tera_flops_per_second(src.peak_tera_flops_per_second());
114 dst->set_peak_hbm_bw_giga_bytes_per_second(
115 src.peak_hbm_bw_giga_bytes_per_second());
116 dst->set_ridge_point(src.ridge_point());
117 }
118
119 // Combines the src Diagnostics into the dst Diagnostics.
CombineDiagnostics(const Diagnostics & src,Diagnostics * dst)120 void CombineDiagnostics(const Diagnostics& src, Diagnostics* dst) {
121 dst->mutable_info()->MergeFrom(src.info());
122 dst->mutable_warnings()->MergeFrom(src.warnings());
123 dst->mutable_errors()->MergeFrom(src.errors());
124 }
125
126 // Combine the src OpStats into the dst OpStats.
CombineOpStats(bool no_accelerator_in_system,int src_host_id,HardwareType hardware_type,const StepIntersection & step_intersection,const OpStats & src,OpStats * dst,OpMetricsDbCombiner * host_op_metrics_db_combiner,OpMetricsDbCombiner * device_op_metrics_db_combiner,OpMetricsDbCombiner * hlo_metrics_db_complete_steps_only_combiner,std::vector<OpMetricsDbCombiner> * hlo_metrics_db_per_step_combiners)127 void CombineOpStats(
128 bool no_accelerator_in_system, int src_host_id, HardwareType hardware_type,
129 const StepIntersection& step_intersection, const OpStats& src, OpStats* dst,
130 OpMetricsDbCombiner* host_op_metrics_db_combiner,
131 OpMetricsDbCombiner* device_op_metrics_db_combiner,
132 OpMetricsDbCombiner* hlo_metrics_db_complete_steps_only_combiner,
133 std::vector<OpMetricsDbCombiner>* hlo_metrics_db_per_step_combiners) {
134 // Combine host_metrics_db.
135 host_op_metrics_db_combiner->Combine(src.host_op_metrics_db());
136 // Combine device_metrics_db.
137 device_op_metrics_db_combiner->Combine(src.device_op_metrics_db());
138
139 // Combine step_db.
140 if (!IsCoordinator(no_accelerator_in_system, hardware_type)) {
141 CombineStepDatabase(src_host_id, step_intersection, src.step_db(),
142 dst->mutable_step_db(),
143 hlo_metrics_db_complete_steps_only_combiner,
144 hlo_metrics_db_per_step_combiners);
145 }
146
147 // Combine run environment info.
148 CombineRunEnvironment(src.run_environment(), dst->mutable_run_environment());
149
150 // Combine the perf environment info.
151 CombinePerfEnv(src.perf_env(), dst->mutable_perf_env());
152
153 // Combine diagnostics.
154 CombineDiagnostics(src.diagnostics(), dst->mutable_diagnostics());
155
156 // Combine kernel stats.
157 dst->mutable_kernel_stats_db()->mutable_reports()->MergeFrom(
158 src.kernel_stats_db().reports());
159
160 // Combine tf-function stats.
161 CombineTfFunctionDb(src.tf_function_db(), dst->mutable_tf_function_db());
162
163 // Combine the mapping from core ID to details.
164 CombineCoreIdMap(src_host_id, src.core_id_to_details(),
165 dst->mutable_core_id_to_details());
166 }
167
168 } // namespace
169
IsCoordinator(bool no_accelerator_in_system,HardwareType hardware_type)170 bool IsCoordinator(bool no_accelerator_in_system, HardwareType hardware_type) {
171 // A host is a coordinator if:
172 // (1) The host doesn't have a device, and
173 // (2) The system does use accelerator (if not, it uses CPU only and so this
174 // host should be regarded as a worker as well).
175 return !HasDevice(hardware_type) && !no_accelerator_in_system;
176 }
177
NoAcceleratorInSystem(const std::vector<OpStatsInfo> & all_op_stats_info)178 bool NoAcceleratorInSystem(const std::vector<OpStatsInfo>& all_op_stats_info) {
179 for (const auto& op_stats_info : all_op_stats_info) {
180 if (HasDevice(op_stats_info.hardware_type)) {
181 return false;
182 }
183 }
184 return true;
185 }
186
GlobalCoreId(int host_id,uint32 device_ordinal)187 uint32 GlobalCoreId(int host_id, uint32 device_ordinal) {
188 constexpr uint32 kMaxDevicesPerHost = 1000; // power-of-10 for debuggability
189 return host_id * kMaxDevicesPerHost + device_ordinal;
190 }
191
ComputeStepIntersectionToMergeOpStats(const std::vector<OpStatsInfo> & all_op_stats_info,uint32 max_step_per_host)192 StepIntersection ComputeStepIntersectionToMergeOpStats(
193 const std::vector<OpStatsInfo>& all_op_stats_info,
194 uint32 max_step_per_host) {
195 bool no_accelerator_in_system = NoAcceleratorInSystem(all_op_stats_info);
196
197 absl::flat_hash_map<uint32, const StepDatabaseResult*> per_host_step_db;
198 for (const auto& op_stats_info : all_op_stats_info) {
199 if (IsCoordinator(no_accelerator_in_system, op_stats_info.hardware_type))
200 continue;
201 // Includes only workers in per_host_step_db.
202 per_host_step_db[op_stats_info.src_host_id] =
203 &op_stats_info.op_stats->step_db();
204 }
205
206 return StepIntersection(max_step_per_host, per_host_step_db);
207 }
208
CombineAllOpStats(const std::vector<OpStatsInfo> & all_op_stats_info,const StepIntersection & step_intersection,OpStats * combined_op_stats)209 void CombineAllOpStats(const std::vector<OpStatsInfo>& all_op_stats_info,
210 const StepIntersection& step_intersection,
211 OpStats* combined_op_stats) {
212 StepDatabaseResult* combined_step_db = combined_op_stats->mutable_step_db();
213 // Initialize the StepDatabaseResult field that depends on the number of
214 // steps.
215 for (uint32 dst_step_num : step_intersection.DstStepNumbers()) {
216 combined_step_db->add_step_sequence()->set_step_num(dst_step_num);
217 }
218 // Record the number of steps that are dropped.
219 combined_step_db->set_num_steps_dropped(step_intersection.StepsDropped());
220
221 // Set the default value of per_core_batch_size in <combined_op_stats>
222 combined_op_stats->mutable_run_environment()->set_per_core_batch_size(-1);
223
224 // Initialize all the OpMetricsDbCombiners.
225 OpMetricsDbCombiner host_op_metrics_db_combiner(
226 combined_op_stats->mutable_host_op_metrics_db());
227 OpMetricsDbCombiner device_op_metrics_db_combiner(
228 combined_op_stats->mutable_device_op_metrics_db());
229 OpMetricsDbCombiner hlo_metrics_db_complete_steps_only_combiner(
230 combined_op_stats->mutable_hlo_metrics_db_complete_steps_only());
231 std::vector<OpMetricsDbCombiner> hlo_metrics_db_per_step_combiners;
232 hlo_metrics_db_per_step_combiners.reserve(
233 combined_step_db->step_sequence_size());
234 for (PerCoreStepInfo& step_info :
235 *combined_step_db->mutable_step_sequence()) {
236 hlo_metrics_db_per_step_combiners.emplace_back(
237 step_info.mutable_hlo_metrics_db());
238 }
239
240 bool no_accelerator_in_system = NoAcceleratorInSystem(all_op_stats_info);
241
242 for (const auto& op_stats_info : all_op_stats_info) {
243 CombineOpStats(no_accelerator_in_system, op_stats_info.src_host_id,
244 op_stats_info.hardware_type, step_intersection,
245 *op_stats_info.op_stats, combined_op_stats,
246 &host_op_metrics_db_combiner, &device_op_metrics_db_combiner,
247 &hlo_metrics_db_complete_steps_only_combiner,
248 &hlo_metrics_db_per_step_combiners);
249 }
250
251 // Sorts all the kernel reports that have been merged by CombineTfOpStats and
252 // keeps only the top kernel reports with long kernel duration.
253 SortAndKeepTopKDurationKernelReportsInDb(
254 combined_op_stats->mutable_kernel_stats_db());
255 }
256
257 } // namespace profiler
258 } // namespace tensorflow
259