1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/common_runtime/step_stats_collector.h"
17 #include "tensorflow/core/common_runtime/costmodel_manager.h"
18 #include "tensorflow/core/framework/allocation_description.pb.h"
19 #include "tensorflow/core/framework/op_kernel.h"
20 #include "tensorflow/core/framework/tensor.h"
21 #include "tensorflow/core/framework/tensor_description.pb.h"
22 #include "tensorflow/core/framework/tracking_allocator.h"
23 #include "tensorflow/core/graph/costmodel.h"
24 #include "tensorflow/core/graph/graph.h"
25 #include "tensorflow/core/lib/core/stringpiece.h"
26 #include "tensorflow/core/lib/strings/numbers.h"
27 #include "tensorflow/core/lib/strings/scanner.h"
28 #include "tensorflow/core/lib/strings/stringprintf.h"
29 #include "tensorflow/core/platform/logging.h"
30 #include "tensorflow/core/util/ptr_util.h"
31 
32 namespace tensorflow {
33 namespace {
34 const int kMaxAllocReportNodes = 100;
35 const float kMaxAllocReportFraction = 0.99;
36 
37 struct AllocStats {
38   std::map<int64, std::vector<string>> nodes_by_size;
39   int64 total_bytes = 0;
40   int64 total_nodes = 0;
41 };
42 }  // namespace
43 
NodeExecStatsWrapper(const Node * node,StepStatsCollector * step_stats_collector)44 NodeExecStatsWrapper::NodeExecStatsWrapper(
45     const Node* node, StepStatsCollector* step_stats_collector)
46     : NodeExecStatsWrapper(MakeUnique<NodeExecStats>(), node,
47                            step_stats_collector) {
48   stats_->set_node_name(node->name());
49 }
50 
NodeExecStatsWrapper(std::unique_ptr<NodeExecStats> stats,const Node * node,StepStatsCollector * step_stats_collector)51 NodeExecStatsWrapper::NodeExecStatsWrapper(
52     std::unique_ptr<NodeExecStats> stats, const Node* node,
53     StepStatsCollector* step_stats_collector)
54     : stats_(std::move(stats)),
55       node_(node),
56       step_stats_collector_(step_stats_collector) {}
57 
Done(const string & device)58 void NodeExecStatsWrapper::Done(const string& device) {
59   // TODO(tucker): merge with the DetailText function in session.cc in a common
60   // location.
61   DCHECK(node_);
62   string memory;
63   for (auto& all : stats_->memory()) {
64     int64 tot = all.total_bytes();
65     if (tot >= 0.1 * 1048576.0) {
66       int64 peak = all.peak_bytes();
67       if (peak > 0) {
68         memory =
69             strings::StrCat(memory, "[", all.allocator_name(),
70                             strings::Printf(" %.1fMB %.1fMB] ", tot / 1048576.0,
71                                             peak / 1048576.0));
72       } else {
73         memory = strings::StrCat(memory, "[", all.allocator_name(),
74                                  strings::Printf(" %.1fMB] ", tot / 1048576.0));
75       }
76     }
77   }
78   const AttrSlice attrs = node_->attrs();
79   string text;
80   if (IsSend(node_)) {
81     string tensor_name;
82     TF_CHECK_OK(GetNodeAttr(attrs, "tensor_name", &tensor_name));
83     string recv_device;
84     TF_CHECK_OK(GetNodeAttr(attrs, "recv_device", &recv_device));
85     text = strings::StrCat(memory, node_->name(), " = ", node_->type_string(),
86                            "(", tensor_name, " @", recv_device);
87   } else if (IsRecv(node_)) {
88     string tensor_name;
89     TF_CHECK_OK(GetNodeAttr(attrs, "tensor_name", &tensor_name));
90     string send_device;
91     TF_CHECK_OK(GetNodeAttr(attrs, "send_device", &send_device));
92     text = strings::StrCat(memory, node_->name(), " = ", node_->type_string(),
93                            "(", tensor_name, " @", send_device);
94   } else {
95     text =
96         strings::StrCat(memory, node_->name(), " = ", node_->type_string(), "(",
97                         str_util::Join(node_->requested_inputs(), ", "), ")");
98   }
99   stats_->set_timeline_label(text);
100   step_stats_collector_->Save(device, this);
101 }
102 
RecordExecutorStarted()103 void NodeExecStatsWrapper::RecordExecutorStarted() {
104   int64 now_nanos = Env::Default()->NowNanos();
105   stats_->set_all_start_micros(now_nanos / EnvTime::kMicrosToNanos);
106   stats_->set_all_start_nanos(now_nanos);
107 }
108 
RecordComputeStarted()109 void NodeExecStatsWrapper::RecordComputeStarted() {
110   int64 now_nanos = Env::Default()->NowNanos();
111   DCHECK_NE(stats_->all_start_micros(), 0);
112   DCHECK_NE(stats_->all_start_nanos(), 0);
113   stats_->set_op_start_rel_micros(now_nanos / EnvTime::kMicrosToNanos -
114                                   stats_->all_start_micros());
115   stats_->set_op_start_rel_nanos(now_nanos - stats_->all_start_nanos());
116 }
117 
RecordComputeEnded()118 void NodeExecStatsWrapper::RecordComputeEnded() {
119   int64 now_nanos = Env::Default()->NowNanos();
120   DCHECK_NE(stats_->all_start_micros(), 0);
121   DCHECK_NE(stats_->all_start_nanos(), 0);
122   stats_->set_op_end_rel_micros(now_nanos / EnvTime::kMicrosToNanos -
123                                 stats_->all_start_micros());
124   stats_->set_op_end_rel_nanos(now_nanos - stats_->all_start_nanos());
125 }
126 
RecordExecutorEnded()127 void NodeExecStatsWrapper::RecordExecutorEnded() {
128   int64 now_nanos = Env::Default()->NowNanos();
129   DCHECK_NE(stats_->all_start_micros(), 0);
130   DCHECK_NE(stats_->all_start_nanos(), 0);
131   stats_->set_all_end_rel_micros(now_nanos / EnvTime::kMicrosToNanos -
132                                  stats_->all_start_micros());
133   stats_->set_all_end_rel_nanos(now_nanos - stats_->all_start_nanos());
134 }
135 
SetScheduled(int64 nanos)136 void NodeExecStatsWrapper::SetScheduled(int64 nanos) {
137   stats_->set_scheduled_micros(nanos / EnvTime::kMicrosToNanos);
138   stats_->set_scheduled_nanos(nanos);
139 }
140 
SetMemory(OpKernelContext * ctx)141 void NodeExecStatsWrapper::SetMemory(OpKernelContext* ctx) {
142   for (const auto& allocator_pair : ctx->ConsumeWrappedAllocators()) {
143     AddAllocation(allocator_pair.first, allocator_pair.second);
144   }
145   auto* ms = stats_->mutable_memory_stats();
146   ms->set_temp_memory_size(ctx->temp_memory_allocated());
147   for (const auto& alloc_id : ctx->persistent_alloc_ids()) {
148     ms->mutable_persistent_tensor_alloc_ids()->Add(alloc_id);
149   }
150   ms->set_persistent_memory_size(ctx->persistent_memory_allocated());
151 }
152 
SetOutput(int slot,const Tensor * tensor)153 void NodeExecStatsWrapper::SetOutput(int slot, const Tensor* tensor) {
154   DCHECK(tensor);
155   NodeOutput* node_output = stats_->add_output();
156   node_output->set_slot(slot);
157   tensor->FillDescription(node_output->mutable_tensor_description());
158 }
159 
SetReferencedTensors(const TensorReferenceVector & tensors)160 void NodeExecStatsWrapper::SetReferencedTensors(
161     const TensorReferenceVector& tensors) {
162   // be careful not to increment the reference count on any tensor
163   // while recording the information
164   for (size_t i = 0; i < tensors.size(); ++i) {
165     AllocationDescription* description = stats_->add_referenced_tensor();
166     tensors.at(i).FillDescription(description);
167   }
168 }
169 
AddAllocation(Allocator * allocator,TrackingAllocator * tracking_allocator)170 void NodeExecStatsWrapper::AddAllocation(
171     Allocator* allocator, TrackingAllocator* tracking_allocator) {
172   AllocatorMemoryUsed* memory = stats_->add_memory();
173   memory->set_allocator_name(allocator->Name());
174   auto sizes = tracking_allocator->GetSizes();
175   memory->set_total_bytes(std::get<0>(sizes));
176   memory->set_peak_bytes(std::get<1>(sizes));
177   memory->set_live_bytes(std::get<2>(sizes));
178 
179   absl::optional<AllocatorStats> stats = allocator->GetStats();
180   if (stats) {
181     memory->set_allocator_bytes_in_use(stats->bytes_in_use);
182   }
183   allocations_.push_back(std::make_pair(memory, tracking_allocator));
184 }
185 
Finalize()186 void NodeExecStatsWrapper::Finalize() {
187   for (auto& alloc : allocations_) {
188     AllocatorMemoryUsed* memory = alloc.first;
189     for (auto& record : alloc.second->GetRecordsAndUnRef()) {
190       auto* r = memory->add_allocation_records();
191       r->set_alloc_bytes(record.alloc_bytes);
192       r->set_alloc_micros(record.alloc_micros);
193     }
194   }
195   allocations_.clear();
196 }
197 
StepStatsCollector(StepStats * step_stats)198 StepStatsCollector::StepStatsCollector(StepStats* step_stats)
199     : finalized_(false), step_stats_(step_stats) {}
200 
ExtractGpuWithStreamAll(string device_name)201 static int ExtractGpuWithStreamAll(string device_name) {
202   // Check if the device name matches the ".*gpu:(\\d+)/stream:all$" regexp,
203   // and if it does return the stream index (always positive). If it doesn't
204   // return -1.
205 
206   // The best way to parse this regexp using a scanner is to parse it in
207   // reverse starting from the end.
208   std::reverse(device_name.begin(), device_name.end());
209   strings::Scanner scanner(device_name);
210   // Check that the string end with '/stream:all'
211   scanner.OneLiteral("lla:maerts/");
212   // Capture the digits if present
213   scanner.RestartCapture().Many(strings::Scanner::DIGIT).StopCapture();
214   // Check that the digits are preceded by the 'device:GPU:' string
215   scanner.OneLiteral(":UPG:ecived");
216   StringPiece capture;
217   bool matched = scanner.GetResult(nullptr, &capture);
218 
219   if (!matched) {
220     return -1;
221   } else {
222     // Convert the captured string into an integer. But first we need to put
223     // the digits back in order
224     string ordered_capture(capture);
225     std::reverse(ordered_capture.begin(), ordered_capture.end());
226     int gpu_id;
227     CHECK(strings::safe_strto32(ordered_capture, &gpu_id));
228     return gpu_id;
229   }
230 }
231 
ExtractGpuWithoutStream(string device_name)232 static int ExtractGpuWithoutStream(string device_name) {
233   // Check if the device name matches the ".*gpu:(\\d+)$" regexp,
234   // and if it does return the stream index (always positive). If it doesn't
235   // return -1.
236 
237   // The best way to parse this regexp using a scanner is to parse it in
238   // reverse starting from the end.
239   std::reverse(device_name.begin(), device_name.end());
240   strings::Scanner scanner(device_name);
241   // Capture the trailing digits if present
242   scanner.RestartCapture().Many(strings::Scanner::DIGIT).StopCapture();
243   // Check that the digits are preceded by the 'device:GPU:' string
244   scanner.OneLiteral(":UPG:ecived");
245   StringPiece capture;
246   bool matched = scanner.GetResult(nullptr, &capture);
247 
248   if (!matched) {
249     return -1;
250   } else {
251     // Convert the captured string into an integer. But first we need to put
252     // the digits back in order
253     string ordered_capture(capture);
254     std::reverse(ordered_capture.begin(), ordered_capture.end());
255     int gpu_id;
256     CHECK(strings::safe_strto32(ordered_capture, &gpu_id));
257     return gpu_id;
258   }
259 }
260 
BuildCostModel(CostModelManager * cost_model_manager,const std::unordered_map<string,const Graph * > & device_map)261 void StepStatsCollector::BuildCostModel(
262     CostModelManager* cost_model_manager,
263     const std::unordered_map<string, const Graph*>& device_map) {
264   mutex_lock lock(mu_);
265 
266   if (!finalized_) {
267     FinalizeInternal();
268   }
269   // Hardware stats for gpu are available under a fake device named
270   // "gpu:<id>/stream::all.
271   // Use them instead of regular stats whenever they're available to extract
272   // the execution stats of a particular node since they're more accurate.
273   // However hardware traces don't record memory usage, so we still have to
274   // rely on regular traces to track memory usage.
275   struct DeviceStats {
276     const DeviceStepStats* regular_stats;
277     const DeviceStepStats* hardware_stats;
278   };
279 
280   std::unordered_map<StringPiece, DeviceStats, StringPieceHasher>
281       per_device_stats;
282   std::unordered_map<int, const DeviceStepStats*> gpu_hardware_stats;
283 
284   for (int i = 0; i < step_stats_->dev_stats_size(); ++i) {
285     const DeviceStepStats& device_stats = step_stats_->dev_stats(i);
286     const string& device_name = device_stats.device();
287     const int gpu_id = ExtractGpuWithStreamAll(device_name);
288     if (gpu_id >= 0) {
289       // These are gpu hardware stats
290       gpu_hardware_stats.emplace(gpu_id, &device_stats);
291     } else {
292       // These are regular stats.
293       per_device_stats.emplace(device_name,
294                                DeviceStats{&device_stats, nullptr});
295     }
296   }
297 
298   for (auto& itr : per_device_stats) {
299     const StringPiece device_name = itr.first;
300     const int gpu_id = ExtractGpuWithoutStream(string(device_name));
301     if (gpu_id >= 0) {
302       // Reference the gpu hardware stats in addition to the regular stats
303       // for this gpu device if they're available.
304       if (gpu_hardware_stats.find(gpu_id) != gpu_hardware_stats.end()) {
305         itr.second.hardware_stats = gpu_hardware_stats.find(gpu_id)->second;
306       }
307     }
308   }
309 
310   for (auto itr : device_map) {
311     const StringPiece device = itr.first;
312     if (per_device_stats.find(device) == per_device_stats.end()) {
313       continue;
314     }
315 
316     const Graph* graph = itr.second;
317     CostModel* cm = cost_model_manager->FindOrCreateCostModel(graph);
318     cm->IncrementUpdateTimes();
319 
320     std::unordered_map<StringPiece, Node*, StringPieceHasher> name_to_node;
321     for (Node* n : graph->nodes()) {
322       name_to_node.emplace(n->name(), n);
323     }
324 
325     const DeviceStats& dev_stats = per_device_stats.find(device)->second;
326 
327     std::unordered_map<string, NodeExecStats> name_to_hw_node_stats;
328     if (dev_stats.hardware_stats) {
329       for (const auto& node_stats : dev_stats.hardware_stats->node_stats()) {
330         string node_name = node_stats.node_name();
331         // Remove the part of op name (e.g. :Conv2D) in the end of a node name.
332         size_t pos = node_name.find_first_of(":");
333         if (pos != std::string::npos) {
334           node_name = node_name.substr(0, pos);
335         }
336         // Certain ops (e.g. Conv2D) are implemented with multiple GPU kernels,
337         // which results in multiple NodeExecStats with the same node name. For
338         // such ops, we sum up the time for all its GPU kernels.
339         if (name_to_hw_node_stats.find(node_name) !=
340             name_to_hw_node_stats.end()) {
341           int64 time = name_to_hw_node_stats[node_name].op_end_rel_micros();
342           name_to_hw_node_stats[node_name].set_op_end_rel_micros(
343               time + node_stats.op_end_rel_micros());
344         } else {
345           name_to_hw_node_stats.emplace(node_name, node_stats);
346         }
347       }
348     }
349 
350     for (int i = 0; i < dev_stats.regular_stats->node_stats_size(); ++i) {
351       const NodeExecStats& stats = dev_stats.regular_stats->node_stats(i);
352       const Node* node = name_to_node[stats.node_name()];
353       if (node) {
354         for (int i = 0; i < stats.output_size(); ++i) {
355           const auto& output = stats.output(i);
356           int output_slot = output.slot();
357           cm->RecordMaxMemorySize(node, output_slot,
358                                   Bytes(output.tensor_description()
359                                             .allocation_description()
360                                             .allocated_bytes()),
361                                   output.tensor_description().shape(),
362                                   node->output_types()[output_slot]);
363           cm->RecordAllocationId(node, output_slot,
364                                  output.tensor_description()
365                                      .allocation_description()
366                                      .allocation_id());
367         }
368         cm->RecordMemoryStats(node, stats.memory_stats());
369         // Use hardware stats to record the execution time if they're available,
370         // otherwise use the regular (less accurate) stats
371         string node_name = dev_stats.regular_stats->node_stats(i).node_name();
372         if (dev_stats.hardware_stats && name_to_hw_node_stats.find(node_name) !=
373                                             name_to_hw_node_stats.end()) {
374           const NodeExecStats& hw_stats = name_to_hw_node_stats[node_name];
375           cm->RecordMaxExecutionTime(
376               node, Microseconds(hw_stats.op_end_rel_micros()));
377         } else {
378           cm->RecordMaxExecutionTime(node,
379                                      Microseconds(stats.op_end_rel_micros()));
380         }
381       }
382     }
383   }
384 }
385 
Save(const string & device,NodeExecStats * node_stats_pb)386 void StepStatsCollector::Save(const string& device,
387                               NodeExecStats* node_stats_pb) {
388   Save(device,
389        new NodeExecStatsWrapper(std::unique_ptr<NodeExecStats>(node_stats_pb),
390                                 nullptr, this));
391 }
392 
Save(const string & device,NodeExecStatsWrapper * node_stats)393 void StepStatsCollector::Save(const string& device,
394                               NodeExecStatsWrapper* node_stats) {
395   if (!node_stats) return;
396   VLOG(1) << "Save dev " << device << " node stats " << node_stats->stats();
397   {
398     mutex_lock l(mu_);
399     if (finalized_) {
400       LOG(WARNING) << "stats saved after finalize will not be collected.";
401     }
402     if (!step_stats_ || collected_nodes_ >= kMaxCollectedNodes) {
403       VLOG(1) << "step_stats_ nullptr or already collected too many nodes.";
404       delete node_stats;
405       return;
406     }
407     auto& device_stats = dev_stats_[device];
408     device_stats.push_back(std::unique_ptr<NodeExecStatsWrapper>(node_stats));
409     collected_nodes_++;
410   }
411 }
412 
SaveThreadName(const string & device,const uint32 thread_id,const string & thread_name)413 void StepStatsCollector::SaveThreadName(const string& device,
414                                         const uint32 thread_id,
415                                         const string& thread_name) {
416   VLOG(1) << "Save dev " << device << " thread id " << thread_id << " name "
417           << thread_name;
418   {
419     mutex_lock l(mu_);
420     if (finalized_) {
421       LOG(WARNING) << "thread_name saved after finalize will not be collected.";
422     }
423     auto& thread_names_map = thread_names_[device];
424     thread_names_map[thread_id] = thread_name;
425   }
426 }
427 
CreateNodeExecStats(const Node * node)428 NodeExecStatsInterface* StepStatsCollector::CreateNodeExecStats(
429     const Node* node) {
430   // Only collect statistics for non-transfer nodes.
431   if (IsSend(node) || IsRecv(node)) {
432     return nullptr;
433   }
434   return new NodeExecStatsWrapper(node, this);
435 }
436 
ReportAllocsOnResourceExhausted(const string & err)437 string StepStatsCollector::ReportAllocsOnResourceExhausted(const string& err) {
438   mutex_lock l(mu_);
439   if (err.find("OOM") == err.npos) {
440     return "";
441   }
442   // <device, allocator> -> AllocStats
443   std::map<std::pair<string, string>, AllocStats> allocs_map;
444   string report = "\n";
445   for (const auto& dev_stat : dev_stats_) {
446     const string& device = dev_stat.first;
447     // Only print the device that has OOM.
448     // TODO(xpan): Extract device from err first to speed it up.
449     if (err.find(device) == err.npos) {
450       continue;
451     }
452     // NodeExecStatsWrapper*
453     for (const auto& stats : dev_stat.second) {
454       // std::pair<AllocatorMemoryUsed*, TrackingAllocator*>
455       for (const auto& alloc : stats->allocations_) {
456         // Only print the allocator that has OOM.
457         // TODO(xpan): Extract device from err first to speed it up.
458         if (err.find(alloc.first->allocator_name()) == err.npos) {
459           continue;
460         }
461         auto dev_allocator =
462             std::make_pair(dev_stat.first, alloc.first->allocator_name());
463         AllocStats& dev_allocs_stats = allocs_map[dev_allocator];
464         TrackingAllocator* tracking_alloc = alloc.second;
465         gtl::InlinedVector<AllocRecord, 4> cur_records =
466             tracking_alloc->GetCurrentRecords();
467         int64 cur_bytes = 0;
468         for (const auto& r : cur_records) {
469           cur_bytes += r.alloc_bytes;
470         }
471         if (cur_bytes > 0) {
472           dev_allocs_stats.total_bytes += cur_bytes;
473           dev_allocs_stats.total_nodes++;
474           dev_allocs_stats.nodes_by_size[cur_bytes].push_back(
475               stats->stats()->node_name());
476         }
477       }
478     }
479   }
480 
481   for (const auto& dev_allocs_it : allocs_map) {
482     const auto& dev = dev_allocs_it.first;
483     const AllocStats& dev_allocs_stats = dev_allocs_it.second;
484     int64 reported_bytes = 0;
485     int64 reported_nodes = 0;
486     bool done = false;
487     strings::StrAppend(&report, "\nCurrent usage from device: ", dev.first,
488                        ", allocator: ", dev.second, "\n");
489     // Print allocations stats of the <device, allocator> pair.
490     for (auto it = dev_allocs_stats.nodes_by_size.rbegin();
491          it != dev_allocs_stats.nodes_by_size.rend(); ++it) {
492       for (const string& node_name : it->second) {
493         reported_bytes += it->first;
494         strings::StrAppend(&report, "  ",
495                            strings::HumanReadableNumBytes(it->first), " from ",
496                            node_name, "\n");
497         if (++reported_nodes > kMaxAllocReportNodes ||
498             reported_bytes >=
499                 dev_allocs_stats.total_bytes * kMaxAllocReportFraction) {
500           done = true;
501           break;
502         }
503       }
504       if (done) break;
505     }
506     int64 remain_nodes = dev_allocs_stats.total_nodes - reported_nodes;
507     int64 remain_bytes = dev_allocs_stats.total_bytes - reported_bytes;
508     if (remain_nodes > 0) {
509       strings::StrAppend(&report, "  Remaining ", remain_nodes, " nodes with ",
510                          strings::HumanReadableNumBytes(remain_bytes), "\n");
511     }
512   }
513   return report;
514 }
515 
Finalize()516 void StepStatsCollector::Finalize() {
517   mutex_lock l(mu_);
518   FinalizeInternal();
519 }
520 
FinalizeAndSwap(StepStats * step_stats)521 void StepStatsCollector::FinalizeAndSwap(StepStats* step_stats) {
522   mutex_lock l(mu_);
523   CHECK(step_stats_);
524   FinalizeInternal();
525   step_stats->Swap(step_stats_);
526   collected_nodes_ = 0;
527 }
528 
FinalizeInternal()529 void StepStatsCollector::FinalizeInternal() {
530   if (!step_stats_ || finalized_) {
531     return;
532   }
533   finalized_ = true;
534   std::map<string, DeviceStepStats*> dev_stats_pb;
535   for (auto& ds : *step_stats_->mutable_dev_stats()) {
536     dev_stats_pb[ds.device()] = &ds;
537   }
538   for (const auto& dev_stat : dev_stats_) {
539     if (dev_stats_pb.find(dev_stat.first) == dev_stats_pb.end()) {
540       DeviceStepStats* ndev_stat = step_stats_->add_dev_stats();
541       ndev_stat->set_device(dev_stat.first);
542       dev_stats_pb[dev_stat.first] = ndev_stat;
543     }
544     DeviceStepStats* dss = dev_stats_pb.at(dev_stat.first);
545     for (auto& stats : dev_stat.second) {
546       stats->Finalize();
547       stats->stats()->Swap(dss->add_node_stats());
548     }
549   }
550   for (const auto& device_thread : thread_names_) {
551     if (dev_stats_pb.find(device_thread.first) == dev_stats_pb.end()) {
552       // skip device without DeviceStepStats.
553       continue;
554     }
555     DeviceStepStats* dss = dev_stats_pb.at(device_thread.first);
556     for (const auto& thread_name : device_thread.second) {
557       (*dss->mutable_thread_names())[thread_name.first] = thread_name.second;
558     }
559   }
560 }
561 }  // namespace tensorflow
562