1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/grappler/clusters/single_machine.h"
17 
18 #include <atomic>
19 #include <memory>
20 
21 #include "tensorflow/cc/training/queue_runner.h"
22 #include "tensorflow/core/common_runtime/device.h"
23 #include "tensorflow/core/common_runtime/device_mgr.h"
24 #include "tensorflow/core/common_runtime/gpu/gpu_id.h"
25 #include "tensorflow/core/common_runtime/gpu/gpu_id_manager.h"
26 #include "tensorflow/core/grappler/clusters/utils.h"
27 #include "tensorflow/core/grappler/utils.h"
28 #include "tensorflow/core/kernels/ops_util.h"
29 #include "tensorflow/core/lib/core/errors.h"
30 #include "tensorflow/core/lib/strings/strcat.h"
31 #include "tensorflow/core/platform/mutex.h"
32 #include "tensorflow/core/platform/notification.h"
33 #include "tensorflow/core/platform/types.h"
34 #include "tensorflow/core/public/session.h"
35 
36 namespace tensorflow {
37 namespace grappler {
38 
39 static std::atomic<bool> already_provisioned(false);
40 
SingleMachine(int timeout_s,int num_cpu_cores,int num_gpus)41 SingleMachine::SingleMachine(int timeout_s, int num_cpu_cores, int num_gpus)
42     : Cluster(timeout_s), expected_init_time_s_(0), closing_(false) {
43   VLOG(1) << "Number of CPU cores: " << num_cpu_cores
44           << " Number of GPUs: " << num_gpus;
45   thread_pool_.reset(new thread::ThreadPool(
46       Env::Default(), SanitizeThreadSuffix("single_machine"), 2));
47 
48   (*options_.config.mutable_device_count())["CPU"] = 1;
49   if (num_gpus > 0) {
50     (*options_.config.mutable_device_count())["GPU"] = num_gpus;
51   }
52   CHECK_GE(num_cpu_cores, 1);
53   options_.config.set_intra_op_parallelism_threads(num_cpu_cores);
54   // Create a session specific thread pool to ensure the threads are reset when
55   // the session is reset.
56   options_.config.add_session_inter_op_thread_pool()->set_num_threads(
57       num_cpu_cores);
58   if (timeout_s > 0) {
59     options_.config.set_operation_timeout_in_ms(timeout_s * 1000);
60   }
61 }
62 
~SingleMachine()63 SingleMachine::~SingleMachine() {
64   CloseSession(false /*use_timeout*/).IgnoreError();
65 
66   // Reset the thread-pool so that there are no outstanding Session::Run(...)s
67   // when we delete the session.
68   thread_pool_.reset();
69 }
70 
Provision()71 Status SingleMachine::Provision() {
72   // This is really ugly: to avoid leaking variables, we need to reset the tf
73   // session every time we're done processing a grappler item. However,
74   // variables are global, and therefore we can't have more than 1 session alive
75   // at a time. This check detects when more that one cluster is provisioned.
76   if (already_provisioned) {
77     return errors::Unavailable(
78         "Can't provision more than one single cluster at a time");
79   }
80 
81   TF_RETURN_IF_ERROR(ResetSession());
82 
83   std::vector<DeviceAttributes> devices;
84   TF_RETURN_IF_ERROR(session_->ListDevices(&devices));
85   for (const auto& dev : devices) {
86     DeviceProperties attr;
87     if (dev.device_type() == "CPU") {
88       attr = GetLocalCPUInfo();
89     } else if (dev.device_type() == "GPU") {
90       DeviceNameUtils::ParsedName parsed;
91       if (!DeviceNameUtils::ParseFullName(dev.name(), &parsed)) {
92         return errors::InvalidArgument(
93             strings::StrCat("Not able to parse GPU device name: ", dev.name()));
94       }
95       TfGpuId tf_gpu_id(parsed.id);
96       PlatformGpuId platform_gpu_id;
97       Status s = GpuIdManager::TfToPlatformGpuId(tf_gpu_id, &platform_gpu_id);
98       if (!s.ok()) {
99         return errors::Unavailable("Unknown TF GPU device with id ",
100                                    tf_gpu_id.value(), ": ", s.ToString());
101       }
102       attr = GetLocalGPUInfo(platform_gpu_id);
103     } else if (dev.device_type().find("XLA") == string::npos) {
104       // Filter out the fake XLA devices to avoid double counting the actual
105       // hardware resources that are available.
106       attr.set_type(dev.device_type());
107     }
108     // Overwrite the memory size since users might have requested to use only a
109     // fraction of the available device memory.
110     attr.set_memory_size(dev.memory_limit());
111     devices_[dev.name()] = attr;
112   }
113   already_provisioned = true;
114 
115   // Clear highmark stats of all local allocators.
116   if (cpu_allocator_stats_enabled_) {
117     TF_RETURN_IF_ERROR(ClearAllocatorStats());
118   }
119   return Status::OK();
120 }
121 
Initialize(const GrapplerItem & item)122 Status SingleMachine::Initialize(const GrapplerItem& item) {
123   mutex_lock l(this->last_graph_mu_);
124   if (last_graph_ != &item.graph || last_graph_id_ != item.id) {
125     init_ops_ = item.init_ops;
126     expected_init_time_s_ = item.expected_init_time;
127     last_graph_ = nullptr;
128     queue_runner_defs_ = item.queue_runners;
129     last_graph_id_ = item.id;
130   }
131   return Status::OK();
132 }
133 
Shutdown()134 Status SingleMachine::Shutdown() {
135   TF_RETURN_IF_ERROR(ShutdownSession());
136 
137   mutex_lock l(this->last_graph_mu_);
138   last_graph_ = nullptr;
139   already_provisioned = false;
140 
141   return Status::OK();
142 }
143 
Run(const GraphDef & graph_def,const std::vector<std::pair<string,Tensor>> & feed,const std::vector<string> & fetch,RunMetadata * metadata)144 Status SingleMachine::Run(const GraphDef& graph_def,
145                           const std::vector<std::pair<string, Tensor>>& feed,
146                           const std::vector<string>& fetch,
147                           RunMetadata* metadata) {
148   {
149     mutex_lock l(this->last_graph_mu_);
150     if (last_graph_ != &graph_def) {
151       TF_RETURN_IF_ERROR(ResetSession());
152       TF_RETURN_IF_ERROR(session_->Create(graph_def));
153       if (!init_ops_.empty()) {
154         init_metadata_ = RunMetadata();
155         int64 timeout_s = timeout_s_ + expected_init_time_s_;
156         TF_RETURN_IF_ERROR(
157             RunWithTimeout({}, init_ops_, &init_metadata_, timeout_s));
158         // The compute cost for init ops is likely to be pessimistic since init
159         // ops are run only once before warmup. Therefore we only keep their
160         // memory costs.
161         for (auto node : *init_metadata_.mutable_cost_graph()->mutable_node()) {
162           node.clear_compute_cost();
163         }
164         // Also clear the timeline to save memory
165         init_metadata_.clear_step_stats();
166       }
167       // We can have at most one hardware trace. Use it for the main graph, and
168       // downgrade tracing of the queue runners to a software trace.
169       RunOptions queue_options = run_options_;
170       if (queue_options.trace_level() >= RunOptions::HARDWARE_TRACE) {
171         queue_options.set_trace_level(RunOptions::SOFTWARE_TRACE);
172       }
173       for (size_t i = 0; i < queue_runner_defs_.size(); ++i) {
174         std::unique_ptr<QueueRunner> queue_runner;
175         TF_RETURN_IF_ERROR(QueueRunner::New(queue_runner_defs_[i],
176                                             coordinator_.get(), &queue_runner));
177 
178         TF_RETURN_IF_ERROR(queue_runner->StartAndCollectCostGraph(
179             session_.get(), queue_options));
180         TF_RETURN_IF_ERROR(
181             coordinator_->RegisterRunner(std::move(queue_runner)));
182         TF_RETURN_IF_ERROR(coordinator_->GetStatus());
183       }
184 
185       // Warmup TensorFlow if needed
186       for (int i = 0;
187            i < options_.config.graph_options().build_cost_model_after(); ++i) {
188         TF_RETURN_IF_ERROR(RunWithTimeout(feed, fetch, nullptr));
189       }
190 
191       last_graph_ = &graph_def;
192     }
193   }
194 
195   if (metadata) {
196     TF_RETURN_IF_ERROR(RunWithTimeout(feed, fetch, metadata));
197     // Merge the costs of the initialization and the queue runners.
198     CostGraphDef queue_costs;
199     TF_RETURN_IF_ERROR(coordinator_->ExportCostGraph(&queue_costs));
200     MergeCosts(metadata->mutable_cost_graph(), init_metadata_.cost_graph(),
201                queue_costs);
202   } else {
203     return RunWithTimeout(feed, fetch, nullptr);
204   }
205   return Status::OK();
206 }
207 
EnablePeakMemoryStats(bool enable)208 Status SingleMachine::EnablePeakMemoryStats(bool enable) {
209   EnableCPUAllocatorStats(enable);
210   cpu_allocator_stats_enabled_ = enable;
211   // No need to enable GPU allocator stats since its stats are always collected.
212   return Status::OK();
213 }
214 
GetPeakMemoryUsage(std::unordered_map<string,uint64> * device_peak_memory) const215 Status SingleMachine::GetPeakMemoryUsage(
216     std::unordered_map<string, uint64>* device_peak_memory) const {
217   // Cpu_allocator->TracksAllocationSizes() returns true doesn't always mean the
218   // the AllocatorStats would be collected.
219   if (!cpu_allocator_stats_enabled_) {
220     return Status(error::INVALID_ARGUMENT,
221                   "Tracking allocation for CPU is not enabled.");
222   }
223 
224   const DeviceMgr* device_mgr;
225   TF_RETURN_IF_ERROR(session_->LocalDeviceManager(&device_mgr));
226   std::vector<Device*> devices = device_mgr->ListDevices();
227 
228   device_peak_memory->clear();
229   for (Device* device : devices) {
230     auto* allocator = device->GetAllocator(AllocatorAttributes());
231     if (!allocator->TracksAllocationSizes()) {
232       return Status(error::INVALID_ARGUMENT,
233                     "Tracking allocation is not enabled.");
234     }
235     absl::optional<AllocatorStats> stats = allocator->GetStats();
236     (*device_peak_memory)[device->name()] =
237         (stats ? stats->peak_bytes_in_use : 0);
238   }
239 
240   return Status::OK();
241 }
242 
RunWithTimeout(const std::vector<std::pair<string,Tensor>> & feed,const std::vector<string> & fetch,RunMetadata * run_metadata)243 Status SingleMachine::RunWithTimeout(
244     const std::vector<std::pair<string, Tensor>>& feed,
245     const std::vector<string>& fetch, RunMetadata* run_metadata) {
246   return RunWithTimeout(feed, fetch, run_metadata, timeout_s_);
247 }
248 
RunWithTimeout(const std::vector<std::pair<string,Tensor>> & feed,const std::vector<string> & fetch,RunMetadata * run_metadata,int64 timeout_s)249 Status SingleMachine::RunWithTimeout(
250     const std::vector<std::pair<string, Tensor>>& feed,
251     const std::vector<string>& fetch, RunMetadata* run_metadata,
252     int64 timeout_s) {
253   // We shouldn't be running or closing the session at this point.
254   {
255     mutex_lock l(close_mu_);
256     CHECK(!closing_);
257   }
258 
259   auto status = std::make_shared<Status>();
260   auto local_metadata = std::make_shared<RunMetadata>();
261   const bool executed_in_time = ExecuteWithTimeout(
262       [this, status, local_metadata, feed, fetch]() {
263         *status = session_->Run(run_options_, feed, {}, fetch, nullptr,
264                                 local_metadata.get());
265       },
266       timeout_s * 1000, thread_pool_.get());
267   if (!executed_in_time) {
268     return errors::DeadlineExceeded("Failed to run the graph after ", timeout_s,
269                                     " seconds, aborting");
270   } else if (run_metadata && status->ok()) {
271     *run_metadata = *local_metadata;
272   }
273   return *status;
274 }
275 
CloseSession(bool use_timeout)276 Status SingleMachine::CloseSession(bool use_timeout) {
277   if (!session_ || !thread_pool_) {
278     return Status::OK();
279   }
280 
281   {
282     mutex_lock l(close_mu_);
283 
284     if (!closing_) {
285       closing_ = true;
286     }
287   }
288 
289   const bool executed_in_time = ExecuteWithTimeout(
290       [&]() {
291         if (this->coordinator_) {
292           this->coordinator_->RequestStop().IgnoreError();
293           // Wait for all the runners to have closed their queues.
294           while (!this->coordinator_->AllRunnersStopped()) {
295             sleep(1);
296           }
297           // Now we can close the session. This should cancel any pending I/O
298           // operation.
299           this->session_->Close().IgnoreError();
300           // Last but not least, we can delete the coordinator.
301           this->coordinator_.reset();
302         } else {
303           this->session_->Close().IgnoreError();
304         }
305 
306         mutex_lock l2(close_mu_);
307         closing_ = false;
308       },
309       use_timeout ? timeout_s_ * 1000 : -1, thread_pool_.get());
310 
311   if (!executed_in_time) {
312     // Let the caller know that we can't shutdown the session, and therefore
313     // can't process any further.
314     return errors::Unavailable("Failed to close the previous session after ",
315                                timeout_s_, " seconds, aborting");
316   }
317 
318   return Status::OK();
319 }
320 
ShutdownSession()321 Status SingleMachine::ShutdownSession() {
322   TF_RETURN_IF_ERROR(CloseSession(true /*use_timeout*/));
323 
324   // Delete the threadpool: this ensures that all the pending closures complete
325   // before we return. Note that if TF deadlocked on us, the closures will
326   // never complete, and the call to thread_pool_.reset() will never return:
327   // therefore we need to delete the threadpool with the background thread.
328   // That thread itself will also never complete, so the user should
329   // abort the process to avoid leaking too many resources.
330   auto n = std::make_shared<Notification>();
331   Env::Default()->SchedClosure([this, n]() {
332     thread_pool_.reset();
333     n->Notify();
334   });
335   int64 timeout_us = 1000000ll * timeout_s_;
336   const bool notified = WaitForNotificationWithTimeout(n.get(), timeout_us);
337   if (!notified) {
338     // Let the caller know that we can't shutdown the session properly since
339     // there are calls to Session::Run() still running.
340     return errors::Unavailable("The session is still running graphs after ",
341                                timeout_s_, " seconds");
342   }
343 
344   return Status::OK();
345 }
346 
ResetSession()347 Status SingleMachine::ResetSession() {
348   if (session_) {
349     LOG(INFO) << "Cleaning up previous session";
350 
351     // Make sure the session is properly closed
352     TF_RETURN_IF_ERROR(ShutdownSession());
353 
354     // Destroying the object deletes all its variables as well. This is only
355     // true for DirectSession.
356     session_.reset();
357   }
358 
359   LOG(INFO) << "Starting new session";
360 
361   // Create a new threadpool
362   thread_pool_.reset(new thread::ThreadPool(
363       Env::Default(), SanitizeThreadSuffix("single_machine"), 2));
364 
365   session_.reset(NewSession(options_));
366   if (!session_) {
367     return errors::Unknown("Failed to create session");
368   }
369   coordinator_.reset(new Coordinator());
370 
371   // Build the DeviceSet.
372   device_set_.reset(new DeviceSet);
373   const DeviceMgr* device_mgr;
374   TF_RETURN_IF_ERROR(session_->LocalDeviceManager(&device_mgr));
375   for (auto d : device_mgr->ListDevices()) {
376     device_set_->AddDevice(d);
377     // We currently don't care about the client device.
378   }
379 
380   return Status::OK();
381 }
382 
MergeCosts(CostGraphDef * graph_costs,const CostGraphDef & init_costs,const CostGraphDef & queue_costs)383 void SingleMachine::MergeCosts(CostGraphDef* graph_costs,
384                                const CostGraphDef& init_costs,
385                                const CostGraphDef& queue_costs) {
386   graph_costs->mutable_node()->Reserve(graph_costs->node_size() +
387                                        init_costs.node_size() +
388                                        queue_costs.node_size());
389   std::unordered_set<string> nodes_seen;
390   int queue_costs_id_offset = graph_costs->node_size();
391   for (const auto& node : graph_costs->node()) {
392     nodes_seen.insert(node.name());
393     if (node.id() >= queue_costs_id_offset) {
394       queue_costs_id_offset = node.id() + 1;
395     }
396   }
397 
398   int init_costs_id_offset = queue_costs_id_offset + queue_costs.node_size();
399   // The costs obtained by running the main graph could be more stable than
400   // the one we get from the queue runners since the queue runners run
401   // asynchronously.
402   for (const auto& node : queue_costs.node()) {
403     if (nodes_seen.find(node.name()) != nodes_seen.end()) {
404       continue;
405     }
406 
407     auto* new_node = graph_costs->add_node();
408     new_node->MergeFrom(node);
409 
410     new_node->set_id(node.id() + queue_costs_id_offset);
411     if (new_node->id() >= init_costs_id_offset) {
412       init_costs_id_offset = new_node->id() + 1;
413     }
414 
415     for (auto& input_info : *new_node->mutable_input_info()) {
416       input_info.set_preceding_node(input_info.preceding_node() +
417                                     queue_costs_id_offset);
418     }
419     for (auto& control_input : *new_node->mutable_control_input()) {
420       control_input += queue_costs_id_offset;
421     }
422   }
423 
424   // Don't overwrite the costs with that generated during initialization since
425   // these are possibly outdated.
426   for (const auto& node : init_costs.node()) {
427     if (nodes_seen.find(node.name()) != nodes_seen.end()) {
428       continue;
429     }
430 
431     auto* new_node = graph_costs->add_node();
432     new_node->MergeFrom(node);
433 
434     new_node->set_id(node.id() + init_costs_id_offset);
435     for (auto& input_info : *new_node->mutable_input_info()) {
436       input_info.set_preceding_node(input_info.preceding_node() +
437                                     init_costs_id_offset);
438     }
439     for (auto& control_input : *new_node->mutable_control_input()) {
440       control_input += init_costs_id_offset;
441     }
442   }
443 }
444 
ClearAllocatorStats() const445 Status SingleMachine::ClearAllocatorStats() const {
446   // Cpu_allocator->TracksAllocationSizes() returns true doesn't always mean the
447   // the AllocatorStats would be collected.
448   if (!cpu_allocator_stats_enabled_) {
449     return Status(error::INVALID_ARGUMENT,
450                   "Tracking allocation for CPU is not enabled.");
451   }
452 
453   const DeviceMgr* device_mgr;
454   TF_RETURN_IF_ERROR(session_->LocalDeviceManager(&device_mgr));
455   std::vector<Device*> devices = device_mgr->ListDevices();
456 
457   for (Device* device : devices) {
458     auto* allocator = device->GetAllocator(AllocatorAttributes());
459     if (!allocator->TracksAllocationSizes()) {
460       return Status(error::INVALID_ARGUMENT,
461                     "Tracking allocation is not enabled.");
462     }
463     allocator->ClearStats();
464   }
465   return Status::OK();
466 }
467 
468 }  // namespace grappler
469 }  // namespace tensorflow
470