1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/common_runtime/process_util.h"
17 
18 #ifdef INTEL_MKL
19 #ifdef _OPENMP
20 #include <omp.h>
21 #endif  // _OPENMP
22 #endif  // INTEL_MKL
23 #include <string.h>
24 
25 #include "tensorflow/core/lib/core/threadpool.h"
26 #include "tensorflow/core/platform/byte_order.h"
27 #include "tensorflow/core/platform/cpu_info.h"
28 #include "tensorflow/core/platform/logging.h"
29 #include "tensorflow/core/platform/tracing.h"
30 #include "tensorflow/core/platform/types.h"
31 #include "tensorflow/core/util/util.h"
32 
33 namespace tensorflow {
34 
35 namespace {
36 
37 // Use environment setting if specified (init once)
GetEnvNumInterOpThreads()38 int32 GetEnvNumInterOpThreads() {
39   static int32 env_num_threads = NumInterOpThreadsFromEnvironment();
40   return env_num_threads;
41 }
42 
DefaultNumInterOpThreads()43 int32 DefaultNumInterOpThreads() {
44 #ifndef __ANDROID__
45   int32 env_num_threads = GetEnvNumInterOpThreads();
46   if (env_num_threads > 0) {
47     return env_num_threads;
48   }
49 
50   // Default to the maximum parallelism for the current process.
51   return port::MaxParallelism();
52 #else
53   // Historically, -D__ANDROID__ resulted in the inter-op threadpool not being
54   // used (regardless of what was chosen here); instead, all work was done on
55   // the thread(s) calling Session::Run. That's no longer the case, but we'd
56   // like to avoid suddenly higher concurrency and peak resource usage (for the
57   // same device shape, graph, and options) versus prior versions - as best we
58   // can:
59   //
60   //   - Single Session::Run (none concurrent), and default options:
61   //     Behavior is mostly the same as before.
62   //
63   //   - Concurrent Session::Runs, and default options:
64   //     Reduced concurrency versus before.
65   //
66   //   - Thread-pool size set explicitly (>1):
67   //     Increased concurrency versus before.
68   //
69   // (We assume the first case is the most common)
70   return 1;
71 #endif
72 }
73 
InitComputePool(const SessionOptions & options)74 static thread::ThreadPool* InitComputePool(const SessionOptions& options) {
75   int32 inter_op_parallelism_threads =
76       options.config.inter_op_parallelism_threads();
77   if (inter_op_parallelism_threads == 0) {
78     inter_op_parallelism_threads = DefaultNumInterOpThreads();
79   }
80   return new thread::ThreadPool(
81       Env::Default(), ThreadOptions(), "Compute", inter_op_parallelism_threads,
82       !options.config.experimental().disable_thread_spinning(),
83       /*allocator=*/nullptr);
84 }
85 
86 }  // namespace
87 
ComputePool(const SessionOptions & options)88 thread::ThreadPool* ComputePool(const SessionOptions& options) {
89   static thread::ThreadPool* compute_pool = InitComputePool(options);
90   return compute_pool;
91 }
92 
NumInterOpThreadsFromEnvironment()93 int32 NumInterOpThreadsFromEnvironment() {
94   int32 num;
95   const char* val = std::getenv("TF_NUM_INTEROP_THREADS");
96   return (val && strings::safe_strto32(val, &num)) ? num : 0;
97 }
98 
NumIntraOpThreadsFromEnvironment()99 int32 NumIntraOpThreadsFromEnvironment() {
100   int32 num;
101   const char* val = std::getenv("TF_NUM_INTRAOP_THREADS");
102   return (val && strings::safe_strto32(val, &num)) ? num : 0;
103 }
104 #if !defined(ENABLE_MKLDNN_THREADPOOL) && defined(INTEL_MKL)
OMPThreadsFromEnvironment()105 int32 OMPThreadsFromEnvironment() {
106   // 1) std::getenv is thread-safe (as long as no other function modifies the
107   // host env) from C++11 onward. 2) Most of TF code (except tests and
108   // experimental code) doesn't call setenv and unsetenv
109   int32 num;
110   const char* val = std::getenv("OMP_NUM_THREADS");
111   return (val && strings::safe_strto32(val, &num)) ? num : 0;
112 }
113 
DefaultNumIntraOpThreads()114 int32 DefaultNumIntraOpThreads() {
115   // Use environment setting if specified (init once)
116   static int env_num_threads = NumIntraOpThreadsFromEnvironment();
117   if (env_num_threads > 0) {
118     return env_num_threads;
119   }
120 
121   // Default to the maximum parallelism for the current process.
122   return port::MaxParallelism();
123 }
124 #endif  // !defined(ENABLE_MKLDNN_THREADPOOL) && defined(INTEL_MKL)
NumInterOpThreadsFromSessionOptions(const SessionOptions & options)125 int32 NumInterOpThreadsFromSessionOptions(const SessionOptions& options) {
126   const int32 inter_op = options.config.inter_op_parallelism_threads();
127   if (inter_op > 0) return inter_op;
128   const int32 env_inter_op = GetEnvNumInterOpThreads();
129   if (env_inter_op > 0) return env_inter_op;
130 
131 #if !defined(ENABLE_MKLDNN_THREADPOOL) && defined(INTEL_MKL)
132   if (!DisableMKL()) {
133     // MKL library executes ops in parallel using OMP threads.
134     // Setting inter_op conservatively to avoid thread oversubscription that
135     // could lead to severe perf degradations and OMP resource exhaustion.
136     // Inter ops are set such that mkl_inter_op * mkl_intra_op <= NumCores.
137     const int32 intra_op = options.config.intra_op_parallelism_threads();
138     const int32 omp_max_threads = OMPThreadsFromEnvironment();
139     const int32 mkl_intra_op =
140         (omp_max_threads > 0)
141             ? omp_max_threads
142             : (intra_op > 0) ? intra_op : DefaultNumIntraOpThreads();
143     DCHECK_GE(mkl_intra_op, 1);
144     const int32 mkl_inter_op = std::max(
145         (DefaultNumInterOpThreads() + mkl_intra_op - 1) / mkl_intra_op, 2);
146     VLOG(0)
147         << "Creating new thread pool with default inter op setting: "
148         << mkl_inter_op
149         << ". Tune using inter_op_parallelism_threads for best performance.";
150     return mkl_inter_op;
151   }
152 #endif  // !defined(ENABLE_MKLDNN_THREADPOOL) && defined(INTEL_MKL)
153   return DefaultNumInterOpThreads();
154 }
155 
NewThreadPoolFromSessionOptions(const SessionOptions & options)156 thread::ThreadPool* NewThreadPoolFromSessionOptions(
157     const SessionOptions& options) {
158   const int32 num_threads = NumInterOpThreadsFromSessionOptions(options);
159   VLOG(1) << "Direct session inter op parallelism threads: " << num_threads;
160   return new thread::ThreadPool(
161       options.env, ThreadOptions(), "Compute", num_threads,
162       !options.config.experimental().disable_thread_spinning(),
163       /*allocator=*/nullptr);
164 }
165 
SchedClosure(std::function<void ()> closure)166 void SchedClosure(std::function<void()> closure) {
167   if (!tracing::EventCollector::IsEnabled()) {
168     return Env::Default()->SchedClosure(std::move(closure));
169   }
170   uint64 id = tracing::GetUniqueArg();
171   tracing::RecordEvent(tracing::EventCategory::kScheduleClosure, id);
172 
173   Env::Default()->SchedClosure([id, closure = std::move(closure)]() {
174     tracing::ScopedRegion region(tracing::EventCategory::kRunClosure, id);
175     closure();
176   });
177 }
178 
SchedNonBlockingClosureAfter(int64 micros,std::function<void ()> closure)179 void SchedNonBlockingClosureAfter(int64 micros, std::function<void()> closure) {
180   Env::Default()->SchedClosureAfter(micros, std::move(closure));
181 }
182 
183 }  // namespace tensorflow
184