1 /* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/profiler/convert/op_stats_combiner.h"
17 
18 #include "absl/container/flat_hash_map.h"
19 #include "tensorflow/core/platform/logging.h"
20 #include "tensorflow/core/platform/macros.h"
21 #include "tensorflow/core/platform/protobuf.h"
22 #include "tensorflow/core/profiler/convert/op_metrics_db_combiner.h"
23 #include "tensorflow/core/profiler/convert/xplane_to_tf_functions.h"
24 #include "tensorflow/core/profiler/protobuf/diagnostics.pb.h"
25 #include "tensorflow/core/profiler/protobuf/hardware_types.pb.h"
26 #include "tensorflow/core/profiler/protobuf/kernel_stats.pb.h"
27 #include "tensorflow/core/profiler/protobuf/op_stats.pb.h"
28 #include "tensorflow/core/profiler/protobuf/steps_db.pb.h"
29 #include "tensorflow/core/profiler/utils/hardware_type_utils.h"
30 #include "tensorflow/core/profiler/utils/kernel_stats_utils.h"
31 #include "tensorflow/core/profiler/utils/step_intersection.h"
32 
33 namespace tensorflow {
34 namespace profiler {
35 
36 namespace {
37 
38 // Combines the src PerCoreStepInfo into the dst PerCoreStepInfo.
CombinePerCoreStepInfo(int src_host_id,const PerCoreStepInfo & src,bool use_incomplete_step,PerCoreStepInfo * dst,OpMetricsDbCombiner * hlo_metrics_db_complete_steps_only_combiner,OpMetricsDbCombiner * hlo_metrics_db_per_step_combiner)39 void CombinePerCoreStepInfo(
40     int src_host_id, const PerCoreStepInfo& src, bool use_incomplete_step,
41     PerCoreStepInfo* dst,
42     OpMetricsDbCombiner* hlo_metrics_db_complete_steps_only_combiner,
43     OpMetricsDbCombiner* hlo_metrics_db_per_step_combiner) {
44   CombineCoreIdMap(src_host_id, src.step_info_per_core(),
45                    dst->mutable_step_info_per_core());
46 
47   // Since we have assigned a new step number to the combined result, update
48   // the step number on each core to this new step number.
49   uint32 new_step_num = dst->step_num();
50   for (auto& percore_stepinfo : *dst->mutable_step_info_per_core()) {
51     auto& stepinfo = percore_stepinfo.second;
52     stepinfo.set_step_num(new_step_num);
53   }
54 
55   if (!use_incomplete_step) {
56     hlo_metrics_db_complete_steps_only_combiner->Combine(src.hlo_metrics_db());
57   }
58   hlo_metrics_db_per_step_combiner->Combine(src.hlo_metrics_db());
59   CombineCoreIdMap(src_host_id, src.flow_db_per_core(),
60                    dst->mutable_flow_db_per_core());
61   CombineCoreIdMap(src_host_id, src.all_reduce_db_per_core(),
62                    dst->mutable_all_reduce_db_per_core());
63   CombineCoreIdMap(src_host_id, src.core_id_to_replica_id_map(),
64                    dst->mutable_core_id_to_replica_id_map());
65 }
66 
CombineStepDatabase(int src_host_id,const StepIntersection & step_intersection,const StepDatabaseResult & src,StepDatabaseResult * dst,OpMetricsDbCombiner * hlo_metrics_db_complete_steps_only_combiner,std::vector<OpMetricsDbCombiner> * hlo_metrics_db_per_step_combiners)67 void CombineStepDatabase(
68     int src_host_id, const StepIntersection& step_intersection,
69     const StepDatabaseResult& src, StepDatabaseResult* dst,
70     OpMetricsDbCombiner* hlo_metrics_db_complete_steps_only_combiner,
71     std::vector<OpMetricsDbCombiner>* hlo_metrics_db_per_step_combiners) {
72   if (src.use_incomplete_step()) dst->set_use_incomplete_step(true);
73   uint32 src_first_step_idx = step_intersection.FirstStepIndex(src_host_id);
74   for (uint32 i = 0; i < step_intersection.NumSteps(); i++) {
75     CombinePerCoreStepInfo(
76         src_host_id, src.step_sequence(src_first_step_idx + i),
77         src.use_incomplete_step(), dst->mutable_step_sequence(i),
78         hlo_metrics_db_complete_steps_only_combiner,
79         &(*hlo_metrics_db_per_step_combiners)[i]);
80   }
81 }
82 
CombineRunEnvironment(const RunEnvironment & src,RunEnvironment * dst)83 void CombineRunEnvironment(const RunEnvironment& src, RunEnvironment* dst) {
84   dst->mutable_hostnames()->insert(src.hostnames().begin(),
85                                    src.hostnames().end());
86   dst->set_host_count(dst->hostnames_size());
87   if (src.device_type() != "CPU") {
88     dst->set_device_type(src.device_type());
89     // TODO(b/111402648): Batch size may differ per-core. Currently, we report
90     // the max batch size. We need to come up with a better measure.
91     dst->set_per_core_batch_size(
92         std::max(src.per_core_batch_size(), dst->per_core_batch_size()));
93     dst->set_device_core_count(src.device_core_count() +
94                                dst->device_core_count());
95     // Replica count and num cores per replica must be same for all copies.
96     dst->set_replica_count(std::max(src.replica_count(), dst->replica_count()));
97     dst->set_num_cores_per_replica(
98         std::max(src.num_cores_per_replica(), dst->num_cores_per_replica()));
99     *dst->mutable_topology() = src.topology();
100   } else if (dst->device_type().empty()) {
101     dst->set_device_type(src.device_type());
102   }
103   dst->set_task_count(src.task_count() + dst->task_count());
104   (*dst->mutable_host_independent_job_info()) = src.host_independent_job_info();
105   for (const auto& job_info : src.host_dependent_job_info()) {
106     *(dst->add_host_dependent_job_info()) = job_info;
107   }
108   dst->set_host_trace_level(src.host_trace_level());
109 }
110 
111 // Combines the src PerfEnv into the dst PerfEnv.
CombinePerfEnv(const PerfEnv & src,PerfEnv * dst)112 void CombinePerfEnv(const PerfEnv& src, PerfEnv* dst) {
113   dst->set_peak_tera_flops_per_second(src.peak_tera_flops_per_second());
114   dst->set_peak_hbm_bw_giga_bytes_per_second(
115       src.peak_hbm_bw_giga_bytes_per_second());
116   dst->set_ridge_point(src.ridge_point());
117 }
118 
119 // Combines the src Diagnostics into the dst Diagnostics.
CombineDiagnostics(const Diagnostics & src,Diagnostics * dst)120 void CombineDiagnostics(const Diagnostics& src, Diagnostics* dst) {
121   dst->mutable_info()->MergeFrom(src.info());
122   dst->mutable_warnings()->MergeFrom(src.warnings());
123   dst->mutable_errors()->MergeFrom(src.errors());
124 }
125 
126 // Combine the src OpStats into the dst OpStats.
CombineOpStats(bool no_accelerator_in_system,int src_host_id,HardwareType hardware_type,const StepIntersection & step_intersection,const OpStats & src,OpStats * dst,OpMetricsDbCombiner * host_op_metrics_db_combiner,OpMetricsDbCombiner * device_op_metrics_db_combiner,OpMetricsDbCombiner * hlo_metrics_db_complete_steps_only_combiner,std::vector<OpMetricsDbCombiner> * hlo_metrics_db_per_step_combiners)127 void CombineOpStats(
128     bool no_accelerator_in_system, int src_host_id, HardwareType hardware_type,
129     const StepIntersection& step_intersection, const OpStats& src, OpStats* dst,
130     OpMetricsDbCombiner* host_op_metrics_db_combiner,
131     OpMetricsDbCombiner* device_op_metrics_db_combiner,
132     OpMetricsDbCombiner* hlo_metrics_db_complete_steps_only_combiner,
133     std::vector<OpMetricsDbCombiner>* hlo_metrics_db_per_step_combiners) {
134   // Combine host_metrics_db.
135   host_op_metrics_db_combiner->Combine(src.host_op_metrics_db());
136   // Combine device_metrics_db.
137   device_op_metrics_db_combiner->Combine(src.device_op_metrics_db());
138 
139   // Combine step_db.
140   if (!IsCoordinator(no_accelerator_in_system, hardware_type)) {
141     CombineStepDatabase(src_host_id, step_intersection, src.step_db(),
142                         dst->mutable_step_db(),
143                         hlo_metrics_db_complete_steps_only_combiner,
144                         hlo_metrics_db_per_step_combiners);
145   }
146 
147   // Combine run environment info.
148   CombineRunEnvironment(src.run_environment(), dst->mutable_run_environment());
149 
150   // Combine the perf environment info.
151   CombinePerfEnv(src.perf_env(), dst->mutable_perf_env());
152 
153   // Combine diagnostics.
154   CombineDiagnostics(src.diagnostics(), dst->mutable_diagnostics());
155 
156   // Combine kernel stats.
157   dst->mutable_kernel_stats_db()->mutable_reports()->MergeFrom(
158       src.kernel_stats_db().reports());
159 
160   // Combine tf-function stats.
161   CombineTfFunctionDb(src.tf_function_db(), dst->mutable_tf_function_db());
162 
163   // Combine the mapping from core ID to details.
164   CombineCoreIdMap(src_host_id, src.core_id_to_details(),
165                    dst->mutable_core_id_to_details());
166 }
167 
168 }  // namespace
169 
IsCoordinator(bool no_accelerator_in_system,HardwareType hardware_type)170 bool IsCoordinator(bool no_accelerator_in_system, HardwareType hardware_type) {
171   // A host is a coordinator if:
172   //   (1) The host doesn't have a device, and
173   //   (2) The system does use accelerator (if not, it uses CPU only and so this
174   //   host should be regarded as a worker as well).
175   return !HasDevice(hardware_type) && !no_accelerator_in_system;
176 }
177 
NoAcceleratorInSystem(const std::vector<OpStatsInfo> & all_op_stats_info)178 bool NoAcceleratorInSystem(const std::vector<OpStatsInfo>& all_op_stats_info) {
179   for (const auto& op_stats_info : all_op_stats_info) {
180     if (HasDevice(op_stats_info.hardware_type)) {
181       return false;
182     }
183   }
184   return true;
185 }
186 
GlobalCoreId(int host_id,uint32 device_ordinal)187 uint32 GlobalCoreId(int host_id, uint32 device_ordinal) {
188   constexpr uint32 kMaxDevicesPerHost = 1000;  // power-of-10 for debuggability
189   return host_id * kMaxDevicesPerHost + device_ordinal;
190 }
191 
ComputeStepIntersectionToMergeOpStats(const std::vector<OpStatsInfo> & all_op_stats_info,uint32 max_step_per_host)192 StepIntersection ComputeStepIntersectionToMergeOpStats(
193     const std::vector<OpStatsInfo>& all_op_stats_info,
194     uint32 max_step_per_host) {
195   bool no_accelerator_in_system = NoAcceleratorInSystem(all_op_stats_info);
196 
197   absl::flat_hash_map<uint32, const StepDatabaseResult*> per_host_step_db;
198   for (const auto& op_stats_info : all_op_stats_info) {
199     if (IsCoordinator(no_accelerator_in_system, op_stats_info.hardware_type))
200       continue;
201     // Includes only workers in per_host_step_db.
202     per_host_step_db[op_stats_info.src_host_id] =
203         &op_stats_info.op_stats->step_db();
204   }
205 
206   return StepIntersection(max_step_per_host, per_host_step_db);
207 }
208 
CombineAllOpStats(const std::vector<OpStatsInfo> & all_op_stats_info,const StepIntersection & step_intersection,OpStats * combined_op_stats)209 void CombineAllOpStats(const std::vector<OpStatsInfo>& all_op_stats_info,
210                        const StepIntersection& step_intersection,
211                        OpStats* combined_op_stats) {
212   StepDatabaseResult* combined_step_db = combined_op_stats->mutable_step_db();
213   // Initialize the StepDatabaseResult field that depends on the number of
214   // steps.
215   for (uint32 dst_step_num : step_intersection.DstStepNumbers()) {
216     combined_step_db->add_step_sequence()->set_step_num(dst_step_num);
217   }
218   // Record the number of steps that are dropped.
219   combined_step_db->set_num_steps_dropped(step_intersection.StepsDropped());
220 
221   // Set the default value of per_core_batch_size in <combined_op_stats>
222   combined_op_stats->mutable_run_environment()->set_per_core_batch_size(-1);
223 
224   // Initialize all the OpMetricsDbCombiners.
225   OpMetricsDbCombiner host_op_metrics_db_combiner(
226       combined_op_stats->mutable_host_op_metrics_db());
227   OpMetricsDbCombiner device_op_metrics_db_combiner(
228       combined_op_stats->mutable_device_op_metrics_db());
229   OpMetricsDbCombiner hlo_metrics_db_complete_steps_only_combiner(
230       combined_op_stats->mutable_hlo_metrics_db_complete_steps_only());
231   std::vector<OpMetricsDbCombiner> hlo_metrics_db_per_step_combiners;
232   hlo_metrics_db_per_step_combiners.reserve(
233       combined_step_db->step_sequence_size());
234   for (PerCoreStepInfo& step_info :
235        *combined_step_db->mutable_step_sequence()) {
236     hlo_metrics_db_per_step_combiners.emplace_back(
237         step_info.mutable_hlo_metrics_db());
238   }
239 
240   bool no_accelerator_in_system = NoAcceleratorInSystem(all_op_stats_info);
241 
242   for (const auto& op_stats_info : all_op_stats_info) {
243     CombineOpStats(no_accelerator_in_system, op_stats_info.src_host_id,
244                    op_stats_info.hardware_type, step_intersection,
245                    *op_stats_info.op_stats, combined_op_stats,
246                    &host_op_metrics_db_combiner, &device_op_metrics_db_combiner,
247                    &hlo_metrics_db_complete_steps_only_combiner,
248                    &hlo_metrics_db_per_step_combiners);
249   }
250 
251   // Sorts all the kernel reports that have been merged by CombineTfOpStats and
252   // keeps only the top kernel reports with long kernel duration.
253   SortAndKeepTopKDurationKernelReportsInDb(
254       combined_op_stats->mutable_kernel_stats_db());
255 }
256 
257 }  // namespace profiler
258 }  // namespace tensorflow
259