1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/grappler/clusters/single_machine.h"
17
18 #include <atomic>
19 #include <memory>
20
21 #include "tensorflow/cc/training/queue_runner.h"
22 #include "tensorflow/core/common_runtime/device.h"
23 #include "tensorflow/core/common_runtime/device_mgr.h"
24 #include "tensorflow/core/common_runtime/gpu/gpu_id.h"
25 #include "tensorflow/core/common_runtime/gpu/gpu_id_manager.h"
26 #include "tensorflow/core/grappler/clusters/utils.h"
27 #include "tensorflow/core/grappler/utils.h"
28 #include "tensorflow/core/kernels/ops_util.h"
29 #include "tensorflow/core/lib/core/errors.h"
30 #include "tensorflow/core/lib/strings/strcat.h"
31 #include "tensorflow/core/platform/mutex.h"
32 #include "tensorflow/core/platform/notification.h"
33 #include "tensorflow/core/platform/types.h"
34 #include "tensorflow/core/public/session.h"
35
36 namespace tensorflow {
37 namespace grappler {
38
39 static std::atomic<bool> already_provisioned(false);
40
SingleMachine(int timeout_s,int num_cpu_cores,int num_gpus)41 SingleMachine::SingleMachine(int timeout_s, int num_cpu_cores, int num_gpus)
42 : Cluster(timeout_s), expected_init_time_s_(0), closing_(false) {
43 VLOG(1) << "Number of CPU cores: " << num_cpu_cores
44 << " Number of GPUs: " << num_gpus;
45 thread_pool_.reset(new thread::ThreadPool(
46 Env::Default(), SanitizeThreadSuffix("single_machine"), 2));
47
48 (*options_.config.mutable_device_count())["CPU"] = 1;
49 if (num_gpus > 0) {
50 (*options_.config.mutable_device_count())["GPU"] = num_gpus;
51 }
52 CHECK_GE(num_cpu_cores, 1);
53 options_.config.set_intra_op_parallelism_threads(num_cpu_cores);
54 // Create a session specific thread pool to ensure the threads are reset when
55 // the session is reset.
56 options_.config.add_session_inter_op_thread_pool()->set_num_threads(
57 num_cpu_cores);
58 if (timeout_s > 0) {
59 options_.config.set_operation_timeout_in_ms(timeout_s * 1000);
60 }
61 }
62
~SingleMachine()63 SingleMachine::~SingleMachine() {
64 CloseSession(false /*use_timeout*/).IgnoreError();
65
66 // Reset the thread-pool so that there are no outstanding Session::Run(...)s
67 // when we delete the session.
68 thread_pool_.reset();
69 }
70
Provision()71 Status SingleMachine::Provision() {
72 // This is really ugly: to avoid leaking variables, we need to reset the tf
73 // session every time we're done processing a grappler item. However,
74 // variables are global, and therefore we can't have more than 1 session alive
75 // at a time. This check detects when more that one cluster is provisioned.
76 if (already_provisioned) {
77 return errors::Unavailable(
78 "Can't provision more than one single cluster at a time");
79 }
80
81 TF_RETURN_IF_ERROR(ResetSession());
82
83 std::vector<DeviceAttributes> devices;
84 TF_RETURN_IF_ERROR(session_->ListDevices(&devices));
85 for (const auto& dev : devices) {
86 DeviceProperties attr;
87 if (dev.device_type() == "CPU") {
88 attr = GetLocalCPUInfo();
89 } else if (dev.device_type() == "GPU") {
90 DeviceNameUtils::ParsedName parsed;
91 if (!DeviceNameUtils::ParseFullName(dev.name(), &parsed)) {
92 return errors::InvalidArgument(
93 strings::StrCat("Not able to parse GPU device name: ", dev.name()));
94 }
95 TfGpuId tf_gpu_id(parsed.id);
96 PlatformGpuId platform_gpu_id;
97 Status s = GpuIdManager::TfToPlatformGpuId(tf_gpu_id, &platform_gpu_id);
98 if (!s.ok()) {
99 return errors::Unavailable("Unknown TF GPU device with id ",
100 tf_gpu_id.value(), ": ", s.ToString());
101 }
102 attr = GetLocalGPUInfo(platform_gpu_id);
103 } else if (dev.device_type().find("XLA") == string::npos) {
104 // Filter out the fake XLA devices to avoid double counting the actual
105 // hardware resources that are available.
106 attr.set_type(dev.device_type());
107 }
108 // Overwrite the memory size since users might have requested to use only a
109 // fraction of the available device memory.
110 attr.set_memory_size(dev.memory_limit());
111 devices_[dev.name()] = attr;
112 }
113 already_provisioned = true;
114
115 // Clear highmark stats of all local allocators.
116 if (cpu_allocator_stats_enabled_) {
117 TF_RETURN_IF_ERROR(ClearAllocatorStats());
118 }
119 return Status::OK();
120 }
121
Initialize(const GrapplerItem & item)122 Status SingleMachine::Initialize(const GrapplerItem& item) {
123 mutex_lock l(this->last_graph_mu_);
124 if (last_graph_ != &item.graph || last_graph_id_ != item.id) {
125 init_ops_ = item.init_ops;
126 expected_init_time_s_ = item.expected_init_time;
127 last_graph_ = nullptr;
128 queue_runner_defs_ = item.queue_runners;
129 last_graph_id_ = item.id;
130 }
131 return Status::OK();
132 }
133
Shutdown()134 Status SingleMachine::Shutdown() {
135 TF_RETURN_IF_ERROR(ShutdownSession());
136
137 mutex_lock l(this->last_graph_mu_);
138 last_graph_ = nullptr;
139 already_provisioned = false;
140
141 return Status::OK();
142 }
143
Run(const GraphDef & graph_def,const std::vector<std::pair<string,Tensor>> & feed,const std::vector<string> & fetch,RunMetadata * metadata)144 Status SingleMachine::Run(const GraphDef& graph_def,
145 const std::vector<std::pair<string, Tensor>>& feed,
146 const std::vector<string>& fetch,
147 RunMetadata* metadata) {
148 {
149 mutex_lock l(this->last_graph_mu_);
150 if (last_graph_ != &graph_def) {
151 TF_RETURN_IF_ERROR(ResetSession());
152 TF_RETURN_IF_ERROR(session_->Create(graph_def));
153 if (!init_ops_.empty()) {
154 init_metadata_ = RunMetadata();
155 int64 timeout_s = timeout_s_ + expected_init_time_s_;
156 TF_RETURN_IF_ERROR(
157 RunWithTimeout({}, init_ops_, &init_metadata_, timeout_s));
158 // The compute cost for init ops is likely to be pessimistic since init
159 // ops are run only once before warmup. Therefore we only keep their
160 // memory costs.
161 for (auto node : *init_metadata_.mutable_cost_graph()->mutable_node()) {
162 node.clear_compute_cost();
163 }
164 // Also clear the timeline to save memory
165 init_metadata_.clear_step_stats();
166 }
167 // We can have at most one hardware trace. Use it for the main graph, and
168 // downgrade tracing of the queue runners to a software trace.
169 RunOptions queue_options = run_options_;
170 if (queue_options.trace_level() >= RunOptions::HARDWARE_TRACE) {
171 queue_options.set_trace_level(RunOptions::SOFTWARE_TRACE);
172 }
173 for (size_t i = 0; i < queue_runner_defs_.size(); ++i) {
174 std::unique_ptr<QueueRunner> queue_runner;
175 TF_RETURN_IF_ERROR(QueueRunner::New(queue_runner_defs_[i],
176 coordinator_.get(), &queue_runner));
177
178 TF_RETURN_IF_ERROR(queue_runner->StartAndCollectCostGraph(
179 session_.get(), queue_options));
180 TF_RETURN_IF_ERROR(
181 coordinator_->RegisterRunner(std::move(queue_runner)));
182 TF_RETURN_IF_ERROR(coordinator_->GetStatus());
183 }
184
185 // Warmup TensorFlow if needed
186 for (int i = 0;
187 i < options_.config.graph_options().build_cost_model_after(); ++i) {
188 TF_RETURN_IF_ERROR(RunWithTimeout(feed, fetch, nullptr));
189 }
190
191 last_graph_ = &graph_def;
192 }
193 }
194
195 if (metadata) {
196 TF_RETURN_IF_ERROR(RunWithTimeout(feed, fetch, metadata));
197 // Merge the costs of the initialization and the queue runners.
198 CostGraphDef queue_costs;
199 TF_RETURN_IF_ERROR(coordinator_->ExportCostGraph(&queue_costs));
200 MergeCosts(metadata->mutable_cost_graph(), init_metadata_.cost_graph(),
201 queue_costs);
202 } else {
203 return RunWithTimeout(feed, fetch, nullptr);
204 }
205 return Status::OK();
206 }
207
EnablePeakMemoryStats(bool enable)208 Status SingleMachine::EnablePeakMemoryStats(bool enable) {
209 EnableCPUAllocatorStats(enable);
210 cpu_allocator_stats_enabled_ = enable;
211 // No need to enable GPU allocator stats since its stats are always collected.
212 return Status::OK();
213 }
214
GetPeakMemoryUsage(std::unordered_map<string,uint64> * device_peak_memory) const215 Status SingleMachine::GetPeakMemoryUsage(
216 std::unordered_map<string, uint64>* device_peak_memory) const {
217 // Cpu_allocator->TracksAllocationSizes() returns true doesn't always mean the
218 // the AllocatorStats would be collected.
219 if (!cpu_allocator_stats_enabled_) {
220 return Status(error::INVALID_ARGUMENT,
221 "Tracking allocation for CPU is not enabled.");
222 }
223
224 const DeviceMgr* device_mgr;
225 TF_RETURN_IF_ERROR(session_->LocalDeviceManager(&device_mgr));
226 std::vector<Device*> devices = device_mgr->ListDevices();
227
228 device_peak_memory->clear();
229 for (Device* device : devices) {
230 auto* allocator = device->GetAllocator(AllocatorAttributes());
231 if (!allocator->TracksAllocationSizes()) {
232 return Status(error::INVALID_ARGUMENT,
233 "Tracking allocation is not enabled.");
234 }
235 absl::optional<AllocatorStats> stats = allocator->GetStats();
236 (*device_peak_memory)[device->name()] =
237 (stats ? stats->peak_bytes_in_use : 0);
238 }
239
240 return Status::OK();
241 }
242
RunWithTimeout(const std::vector<std::pair<string,Tensor>> & feed,const std::vector<string> & fetch,RunMetadata * run_metadata)243 Status SingleMachine::RunWithTimeout(
244 const std::vector<std::pair<string, Tensor>>& feed,
245 const std::vector<string>& fetch, RunMetadata* run_metadata) {
246 return RunWithTimeout(feed, fetch, run_metadata, timeout_s_);
247 }
248
RunWithTimeout(const std::vector<std::pair<string,Tensor>> & feed,const std::vector<string> & fetch,RunMetadata * run_metadata,int64 timeout_s)249 Status SingleMachine::RunWithTimeout(
250 const std::vector<std::pair<string, Tensor>>& feed,
251 const std::vector<string>& fetch, RunMetadata* run_metadata,
252 int64 timeout_s) {
253 // We shouldn't be running or closing the session at this point.
254 {
255 mutex_lock l(close_mu_);
256 CHECK(!closing_);
257 }
258
259 auto status = std::make_shared<Status>();
260 auto local_metadata = std::make_shared<RunMetadata>();
261 const bool executed_in_time = ExecuteWithTimeout(
262 [this, status, local_metadata, feed, fetch]() {
263 *status = session_->Run(run_options_, feed, {}, fetch, nullptr,
264 local_metadata.get());
265 },
266 timeout_s * 1000, thread_pool_.get());
267 if (!executed_in_time) {
268 return errors::DeadlineExceeded("Failed to run the graph after ", timeout_s,
269 " seconds, aborting");
270 } else if (run_metadata && status->ok()) {
271 *run_metadata = *local_metadata;
272 }
273 return *status;
274 }
275
CloseSession(bool use_timeout)276 Status SingleMachine::CloseSession(bool use_timeout) {
277 if (!session_ || !thread_pool_) {
278 return Status::OK();
279 }
280
281 {
282 mutex_lock l(close_mu_);
283
284 if (!closing_) {
285 closing_ = true;
286 }
287 }
288
289 const bool executed_in_time = ExecuteWithTimeout(
290 [&]() {
291 if (this->coordinator_) {
292 this->coordinator_->RequestStop().IgnoreError();
293 // Wait for all the runners to have closed their queues.
294 while (!this->coordinator_->AllRunnersStopped()) {
295 sleep(1);
296 }
297 // Now we can close the session. This should cancel any pending I/O
298 // operation.
299 this->session_->Close().IgnoreError();
300 // Last but not least, we can delete the coordinator.
301 this->coordinator_.reset();
302 } else {
303 this->session_->Close().IgnoreError();
304 }
305
306 mutex_lock l2(close_mu_);
307 closing_ = false;
308 },
309 use_timeout ? timeout_s_ * 1000 : -1, thread_pool_.get());
310
311 if (!executed_in_time) {
312 // Let the caller know that we can't shutdown the session, and therefore
313 // can't process any further.
314 return errors::Unavailable("Failed to close the previous session after ",
315 timeout_s_, " seconds, aborting");
316 }
317
318 return Status::OK();
319 }
320
ShutdownSession()321 Status SingleMachine::ShutdownSession() {
322 TF_RETURN_IF_ERROR(CloseSession(true /*use_timeout*/));
323
324 // Delete the threadpool: this ensures that all the pending closures complete
325 // before we return. Note that if TF deadlocked on us, the closures will
326 // never complete, and the call to thread_pool_.reset() will never return:
327 // therefore we need to delete the threadpool with the background thread.
328 // That thread itself will also never complete, so the user should
329 // abort the process to avoid leaking too many resources.
330 auto n = std::make_shared<Notification>();
331 Env::Default()->SchedClosure([this, n]() {
332 thread_pool_.reset();
333 n->Notify();
334 });
335 int64 timeout_us = 1000000ll * timeout_s_;
336 const bool notified = WaitForNotificationWithTimeout(n.get(), timeout_us);
337 if (!notified) {
338 // Let the caller know that we can't shutdown the session properly since
339 // there are calls to Session::Run() still running.
340 return errors::Unavailable("The session is still running graphs after ",
341 timeout_s_, " seconds");
342 }
343
344 return Status::OK();
345 }
346
ResetSession()347 Status SingleMachine::ResetSession() {
348 if (session_) {
349 LOG(INFO) << "Cleaning up previous session";
350
351 // Make sure the session is properly closed
352 TF_RETURN_IF_ERROR(ShutdownSession());
353
354 // Destroying the object deletes all its variables as well. This is only
355 // true for DirectSession.
356 session_.reset();
357 }
358
359 LOG(INFO) << "Starting new session";
360
361 // Create a new threadpool
362 thread_pool_.reset(new thread::ThreadPool(
363 Env::Default(), SanitizeThreadSuffix("single_machine"), 2));
364
365 session_.reset(NewSession(options_));
366 if (!session_) {
367 return errors::Unknown("Failed to create session");
368 }
369 coordinator_.reset(new Coordinator());
370
371 // Build the DeviceSet.
372 device_set_.reset(new DeviceSet);
373 const DeviceMgr* device_mgr;
374 TF_RETURN_IF_ERROR(session_->LocalDeviceManager(&device_mgr));
375 for (auto d : device_mgr->ListDevices()) {
376 device_set_->AddDevice(d);
377 // We currently don't care about the client device.
378 }
379
380 return Status::OK();
381 }
382
MergeCosts(CostGraphDef * graph_costs,const CostGraphDef & init_costs,const CostGraphDef & queue_costs)383 void SingleMachine::MergeCosts(CostGraphDef* graph_costs,
384 const CostGraphDef& init_costs,
385 const CostGraphDef& queue_costs) {
386 graph_costs->mutable_node()->Reserve(graph_costs->node_size() +
387 init_costs.node_size() +
388 queue_costs.node_size());
389 std::unordered_set<string> nodes_seen;
390 int queue_costs_id_offset = graph_costs->node_size();
391 for (const auto& node : graph_costs->node()) {
392 nodes_seen.insert(node.name());
393 if (node.id() >= queue_costs_id_offset) {
394 queue_costs_id_offset = node.id() + 1;
395 }
396 }
397
398 int init_costs_id_offset = queue_costs_id_offset + queue_costs.node_size();
399 // The costs obtained by running the main graph could be more stable than
400 // the one we get from the queue runners since the queue runners run
401 // asynchronously.
402 for (const auto& node : queue_costs.node()) {
403 if (nodes_seen.find(node.name()) != nodes_seen.end()) {
404 continue;
405 }
406
407 auto* new_node = graph_costs->add_node();
408 new_node->MergeFrom(node);
409
410 new_node->set_id(node.id() + queue_costs_id_offset);
411 if (new_node->id() >= init_costs_id_offset) {
412 init_costs_id_offset = new_node->id() + 1;
413 }
414
415 for (auto& input_info : *new_node->mutable_input_info()) {
416 input_info.set_preceding_node(input_info.preceding_node() +
417 queue_costs_id_offset);
418 }
419 for (auto& control_input : *new_node->mutable_control_input()) {
420 control_input += queue_costs_id_offset;
421 }
422 }
423
424 // Don't overwrite the costs with that generated during initialization since
425 // these are possibly outdated.
426 for (const auto& node : init_costs.node()) {
427 if (nodes_seen.find(node.name()) != nodes_seen.end()) {
428 continue;
429 }
430
431 auto* new_node = graph_costs->add_node();
432 new_node->MergeFrom(node);
433
434 new_node->set_id(node.id() + init_costs_id_offset);
435 for (auto& input_info : *new_node->mutable_input_info()) {
436 input_info.set_preceding_node(input_info.preceding_node() +
437 init_costs_id_offset);
438 }
439 for (auto& control_input : *new_node->mutable_control_input()) {
440 control_input += init_costs_id_offset;
441 }
442 }
443 }
444
ClearAllocatorStats() const445 Status SingleMachine::ClearAllocatorStats() const {
446 // Cpu_allocator->TracksAllocationSizes() returns true doesn't always mean the
447 // the AllocatorStats would be collected.
448 if (!cpu_allocator_stats_enabled_) {
449 return Status(error::INVALID_ARGUMENT,
450 "Tracking allocation for CPU is not enabled.");
451 }
452
453 const DeviceMgr* device_mgr;
454 TF_RETURN_IF_ERROR(session_->LocalDeviceManager(&device_mgr));
455 std::vector<Device*> devices = device_mgr->ListDevices();
456
457 for (Device* device : devices) {
458 auto* allocator = device->GetAllocator(AllocatorAttributes());
459 if (!allocator->TracksAllocationSizes()) {
460 return Status(error::INVALID_ARGUMENT,
461 "Tracking allocation is not enabled.");
462 }
463 allocator->ClearStats();
464 }
465 return Status::OK();
466 }
467
468 } // namespace grappler
469 } // namespace tensorflow
470