1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #define EIGEN_USE_THREADS
17 
18 #include "tensorflow/core/common_runtime/local_device.h"
19 
20 #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
21 #include "tensorflow/core/common_runtime/process_state.h"
22 #include "tensorflow/core/common_runtime/process_util.h"
23 #include "tensorflow/core/lib/core/threadpool.h"
24 #include "tensorflow/core/platform/byte_order.h"
25 #include "tensorflow/core/platform/cpu_feature_guard.h"
26 #include "tensorflow/core/platform/cpu_info.h"
27 #include "tensorflow/core/platform/logging.h"
28 #include "tensorflow/core/platform/numa.h"
29 #include "tensorflow/core/platform/types.h"
30 #include "tensorflow/core/public/session_options.h"
31 #include "tensorflow/core/util/env_var.h"
32 
33 namespace tensorflow {
34 namespace {
35 
OverrideGlobalThreadPoolFromEnvironment()36 bool OverrideGlobalThreadPoolFromEnvironment() {
37   static const bool override_global_threadpool = [] {
38     bool flag;
39     auto status = ReadBoolFromEnvVar("TF_OVERRIDE_GLOBAL_THREADPOOL",
40                                      /*default_val=*/false, &flag);
41     if (!status.ok()) {
42       LOG(ERROR) << "OverrideGlobalThreadPool: " << status.error_message();
43       return false;
44     }
45     return flag;
46   }();
47   return override_global_threadpool;
48 }
49 
50 }  // namespace
51 
52 /* static */
53 bool LocalDevice::use_global_threadpool_ = true;
54 mutex LocalDevice::global_tp_mu_;
55 gtl::InlinedVector<LocalDevice::EigenThreadPoolInfo*, 4>
56     LocalDevice::global_tp_info_;
57 
58 struct LocalDevice::EigenThreadPoolInfo {
59   // Wrapper so we can provide the CPUAllocator to Eigen for use
60   // when ops need extra tmp memory.
61   class EigenAllocator : public Eigen::Allocator {
62    public:
EigenAllocator(tensorflow::Allocator * a)63     explicit EigenAllocator(tensorflow::Allocator* a) : allocator_(a) {}
allocate(size_t num_bytes) const64     void* allocate(size_t num_bytes) const override {
65       return allocator_->AllocateRaw(64, num_bytes);
66     }
deallocate(void * buffer) const67     void deallocate(void* buffer) const override {
68       allocator_->DeallocateRaw(buffer);
69     }
70     tensorflow::Allocator* allocator_;
71   };
72 
EigenThreadPoolInfotensorflow::LocalDevice::EigenThreadPoolInfo73   explicit EigenThreadPoolInfo(const SessionOptions& options, int numa_node,
74                                Allocator* allocator) {
75     // Use session setting if specified.
76     int32 intra_op_parallelism_threads =
77         options.config.intra_op_parallelism_threads();
78     // If no session setting, use environment setting.
79     if (intra_op_parallelism_threads == 0) {
80       static int env_num_threads = NumIntraOpThreadsFromEnvironment();
81       intra_op_parallelism_threads = env_num_threads;
82       // If no session setting or environment, compute a reasonable default.
83       if (intra_op_parallelism_threads == 0) {
84         intra_op_parallelism_threads = port::MaxParallelism(numa_node);
85       }
86     }
87     ThreadOptions thread_opts;
88     thread_opts.numa_node = numa_node;
89     eigen_worker_threads_.num_threads = intra_op_parallelism_threads;
90     eigen_worker_threads_.workers = new thread::ThreadPool(
91         options.env, thread_opts, strings::StrCat("numa_", numa_node, "_Eigen"),
92         intra_op_parallelism_threads,
93         !options.config.experimental().disable_thread_spinning(),
94         /*allocator=*/nullptr);
95     Eigen::ThreadPoolInterface* threadpool =
96         eigen_worker_threads_.workers->AsEigenThreadPool();
97     if (allocator != nullptr) {
98       eigen_allocator_.reset(new EigenAllocator(allocator));
99     }
100     eigen_device_.reset(new Eigen::ThreadPoolDevice(
101         threadpool, eigen_worker_threads_.num_threads, eigen_allocator_.get()));
102   }
103 
~EigenThreadPoolInfotensorflow::LocalDevice::EigenThreadPoolInfo104   ~EigenThreadPoolInfo() {
105     eigen_device_.reset();
106     delete eigen_worker_threads_.workers;
107   }
108 
109   DeviceBase::CpuWorkerThreads eigen_worker_threads_;
110   std::unique_ptr<Eigen::ThreadPoolDevice> eigen_device_;
111   std::unique_ptr<EigenAllocator> eigen_allocator_;
112 };
113 
LocalDevice(const SessionOptions & options,const DeviceAttributes & attributes)114 LocalDevice::LocalDevice(const SessionOptions& options,
115                          const DeviceAttributes& attributes)
116     : Device(options.env, attributes), owned_tp_info_(nullptr) {
117   // Log info messages if TensorFlow is not compiled with instructions that
118   // could speed up performance and are available on the current CPU.
119   port::InfoAboutUnusedCPUFeatures();
120   LocalDevice::EigenThreadPoolInfo* tp_info;
121 
122   if (OverrideGlobalThreadPoolFromEnvironment()) {
123     set_use_global_threadpool(false);
124   }
125 
126   if (use_global_threadpool_) {
127     mutex_lock l(global_tp_mu_);
128     if (options.config.experimental().use_numa_affinity()) {
129       int numa_node = attributes.locality().numa_node();
130       int num_numa_nodes = port::NUMANumNodes();
131       DCHECK_LT(numa_node, num_numa_nodes);
132       Allocator* numa_allocator =
133           ProcessState::singleton()->GetCPUAllocator(numa_node);
134       while (numa_node >= global_tp_info_.size()) {
135         global_tp_info_.push_back(nullptr);
136       }
137       if (!global_tp_info_[numa_node]) {
138         global_tp_info_[numa_node] = new LocalDevice::EigenThreadPoolInfo(
139             options, numa_node, numa_allocator);
140       }
141       tp_info = global_tp_info_[numa_node];
142     } else {
143       if (global_tp_info_.empty()) {
144         global_tp_info_.push_back(new LocalDevice::EigenThreadPoolInfo(
145             options, port::kNUMANoAffinity, nullptr));
146       }
147       tp_info = global_tp_info_[0];
148     }
149   } else {
150     // Each LocalDevice owns a separate ThreadPoolDevice for numerical
151     // computations.
152     // TODO(tucker): NUMA for these too?
153     owned_tp_info_.reset(new LocalDevice::EigenThreadPoolInfo(
154         options, port::kNUMANoAffinity, nullptr));
155     tp_info = owned_tp_info_.get();
156   }
157   set_tensorflow_cpu_worker_threads(&tp_info->eigen_worker_threads_);
158   set_eigen_cpu_device(tp_info->eigen_device_.get());
159 }
160 
~LocalDevice()161 LocalDevice::~LocalDevice() {}
162 
163 }  // namespace tensorflow
164