1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_PLATFORM_THREADPOOL_H_
17 #define TENSORFLOW_CORE_PLATFORM_THREADPOOL_H_
18 
19 #include <functional>
20 #include <memory>
21 
22 #include "absl/types/optional.h"
23 #include "tensorflow/core/platform/env.h"
24 #include "tensorflow/core/platform/macros.h"
25 #include "tensorflow/core/platform/threadpool_interface.h"
26 #include "tensorflow/core/platform/types.h"
27 
28 namespace Eigen {
29 class Allocator;
30 class ThreadPoolInterface;
31 struct ThreadPoolDevice;
32 
33 template <typename Environment>
34 class ThreadPoolTempl;
35 }  // namespace Eigen
36 
37 namespace tensorflow {
38 namespace thread {
39 
40 struct EigenEnvironment;
41 
42 class ThreadPool {
43  public:
44   // Scheduling strategies for ParallelFor. The strategy governs how the given
45   // units of work are distributed among the available threads in the
46   // threadpool.
47   enum class SchedulingStrategy {
48     // The Adaptive scheduling strategy adaptively chooses the shard sizes based
49     // on the cost of each unit of work, and the cost model of the underlying
50     // threadpool device.
51     //
52     // The 'cost_per_unit' is an estimate of the number of CPU cycles (or
53     // nanoseconds if not CPU-bound) to complete a unit of work. Overestimating
54     // creates too many shards and CPU time will be dominated by per-shard
55     // overhead, such as Context creation. Underestimating may not fully make
56     // use of the specified parallelism, and may also cause inefficiencies due
57     // to load balancing issues and stragglers.
58     kAdaptive,
59     // The Fixed Block Size scheduling strategy shards the given units of work
60     // into shards of fixed size. In case the total number of units is not
61     // evenly divisible by 'block_size', at most one of the shards may be of
62     // smaller size. The exact number of shards may be found by a call to
63     // NumShardsUsedByFixedBlockSizeScheduling.
64     //
65     // Each shard may be executed on a different thread in parallel, depending
66     // on the number of threads available in the pool. Note that when there
67     // aren't enough threads in the pool to achieve full parallelism, function
68     // calls will be automatically queued.
69     kFixedBlockSize
70   };
71 
72   // Contains additional parameters for either the Adaptive or the Fixed Block
73   // Size scheduling strategy.
74   class SchedulingParams {
75    public:
SchedulingParams(SchedulingStrategy strategy,absl::optional<int64> cost_per_unit,absl::optional<int64> block_size)76     explicit SchedulingParams(SchedulingStrategy strategy,
77                               absl::optional<int64> cost_per_unit,
78                               absl::optional<int64> block_size)
79         : strategy_(strategy),
80           cost_per_unit_(cost_per_unit),
81           block_size_(block_size) {}
82 
strategy()83     SchedulingStrategy strategy() const { return strategy_; }
cost_per_unit()84     absl::optional<int64> cost_per_unit() const { return cost_per_unit_; }
block_size()85     absl::optional<int64> block_size() const { return block_size_; }
86 
87    private:
88     // The underlying Scheduling Strategy for which this instance contains
89     // additional parameters.
90     SchedulingStrategy strategy_;
91 
92     // The estimated cost per unit of work in number of CPU cycles (or
93     // nanoseconds if not CPU-bound). Only applicable for Adaptive scheduling
94     // strategy.
95     absl::optional<int64> cost_per_unit_;
96 
97     // The block size of each shard. Only applicable for Fixed Block Size
98     // scheduling strategy.
99     absl::optional<int64> block_size_;
100   };
101 
102   // Constructs a pool that contains "num_threads" threads with specified
103   // "name". env->StartThread() is used to create individual threads with the
104   // given ThreadOptions. If "low_latency_hint" is true the thread pool
105   // implementation may use it as a hint that lower latency is preferred at the
106   // cost of higher CPU usage, e.g. by letting one or more idle threads spin
107   // wait. Conversely, if the threadpool is used to schedule high-latency
108   // operations like I/O the hint should be set to false.
109   //
110   // REQUIRES: num_threads > 0
111   ThreadPool(Env* env, const ThreadOptions& thread_options,
112              const std::string& name, int num_threads, bool low_latency_hint,
113              Eigen::Allocator* allocator = nullptr);
114 
115   // Constructs a pool for low-latency ops that contains "num_threads" threads
116   // with specified "name". env->StartThread() is used to create individual
117   // threads.
118   // REQUIRES: num_threads > 0
119   ThreadPool(Env* env, const std::string& name, int num_threads);
120 
121   // Constructs a pool for low-latency ops that contains "num_threads" threads
122   // with specified "name". env->StartThread() is used to create individual
123   // threads with the given ThreadOptions.
124   // REQUIRES: num_threads > 0
125   ThreadPool(Env* env, const ThreadOptions& thread_options,
126              const std::string& name, int num_threads);
127 
128   // Constructs a pool that wraps around the thread::ThreadPoolInterface
129   // instance provided by the caller. Caller retains ownership of
130   // `user_threadpool` and must ensure its lifetime is longer than the
131   // ThreadPool instance.
132   explicit ThreadPool(thread::ThreadPoolInterface* user_threadpool);
133 
134   // Waits until all scheduled work has finished and then destroy the
135   // set of threads.
136   ~ThreadPool();
137 
138   // Schedules fn() for execution in the pool of threads.
139   void Schedule(std::function<void()> fn);
140 
141   void SetStealPartitions(
142       const std::vector<std::pair<unsigned, unsigned>>& partitions);
143 
144   void ScheduleWithHint(std::function<void()> fn, int start, int limit);
145 
146   // Returns the number of shards used by ParallelForFixedBlockSizeScheduling
147   // with these parameters.
148   int NumShardsUsedByFixedBlockSizeScheduling(const int64 total,
149                                               const int64 block_size);
150 
151   // Returns the number of threads spawned by calling TransformRangeConcurrently
152   // with these parameters.
153   // Deprecated. Use NumShardsUsedByFixedBlockSizeScheduling.
154   int NumShardsUsedByTransformRangeConcurrently(const int64 block_size,
155                                                 const int64 total);
156 
157   // ParallelFor shards the "total" units of work assuming each unit of work
158   // having roughly "cost_per_unit" cost, in cycles. Each unit of work is
159   // indexed 0, 1, ..., total - 1. Each shard contains 1 or more units of work
160   // and the total cost of each shard is roughly the same.
161   //
162   // "cost_per_unit" is an estimate of the number of CPU cycles (or nanoseconds
163   // if not CPU-bound) to complete a unit of work. Overestimating creates too
164   // many shards and CPU time will be dominated by per-shard overhead, such as
165   // Context creation. Underestimating may not fully make use of the specified
166   // parallelism, and may also cause inefficiencies due to load balancing
167   // issues and stragglers.
168   void ParallelFor(int64 total, int64 cost_per_unit,
169                    const std::function<void(int64, int64)>& fn);
170 
171   // Similar to ParallelFor above, but takes the specified scheduling strategy
172   // into account.
173   void ParallelFor(int64 total, const SchedulingParams& scheduling_params,
174                    const std::function<void(int64, int64)>& fn);
175 
176   // Same as ParallelFor with Fixed Block Size scheduling strategy.
177   // Deprecated. Prefer ParallelFor with a SchedulingStrategy argument.
178   void TransformRangeConcurrently(const int64 block_size, const int64 total,
179                                   const std::function<void(int64, int64)>& fn);
180 
181   // Shards the "total" units of work. For more details, see "ParallelFor".
182   //
183   // The function is passed a thread_id between 0 and NumThreads() *inclusive*.
184   // This is because some work can happen on the caller thread while the threads
185   // in the pool are also being used.
186   //
187   // The caller can allocate NumThreads() + 1 separate buffers for each thread.
188   // Each thread can safely write to the buffer given by its id without
189   // synchronization. However, the worker fn may be called multiple times
190   // sequentially with the same id.
191   //
192   // At most NumThreads() unique ids will actually be used, and only a few may
193   // be used for small workloads. If each buffer is expensive, the buffers
194   // should be stored in an array initially filled with null, and a buffer
195   // should be allocated by fn the first time that the id is used.
196   void ParallelForWithWorkerId(
197       int64 total, int64 cost_per_unit,
198       const std::function<void(int64, int64, int)>& fn);
199 
200   // Similar to ParallelForWithWorkerId above, but takes the specified
201   // scheduling strategy into account.
202   void ParallelForWithWorkerId(
203       int64 total, const SchedulingParams& scheduling_params,
204       const std::function<void(int64, int64, int)>& fn);
205 
206   // Returns the number of threads in the pool.
207   int NumThreads() const;
208 
209   // Returns current thread id between 0 and NumThreads() - 1, if called from a
210   // thread in the pool. Returns -1 otherwise.
211   int CurrentThreadId() const;
212 
213   // If ThreadPool implementation is compatible with Eigen::ThreadPoolInterface,
214   // returns a non-null pointer. The caller does not own the object the returned
215   // pointer points to, and should not attempt to delete.
216   Eigen::ThreadPoolInterface* AsEigenThreadPool() const;
217 
218  private:
219   // Divides the work represented by the range [0, total) into k shards.
220   // Calls fn(i*block_size, (i+1)*block_size) from the ith shard (0 <= i < k).
221   // Each shard may be executed on a different thread in parallel, depending on
222   // the number of threads available in the pool.
223   // When (i+1)*block_size > total, fn(i*block_size, total) is called instead.
224   // Here, k = NumShardsUsedByFixedBlockSizeScheduling(total, block_size).
225   // Requires 0 < block_size <= total.
226   void ParallelForFixedBlockSizeScheduling(
227       const int64 total, const int64 block_size,
228       const std::function<void(int64, int64)>& fn);
229 
230   // underlying_threadpool_ is the user_threadpool if user_threadpool is
231   // provided in the constructor. Otherwise it is the eigen_threadpool_.
232   Eigen::ThreadPoolInterface* underlying_threadpool_;
233   // eigen_threadpool_ is instantiated and owned by thread::ThreadPool if
234   // user_threadpool is not in the constructor.
235   std::unique_ptr<Eigen::ThreadPoolTempl<EigenEnvironment>> eigen_threadpool_;
236   std::unique_ptr<Eigen::ThreadPoolDevice> threadpool_device_;
237   TF_DISALLOW_COPY_AND_ASSIGN(ThreadPool);
238 };
239 
240 }  // namespace thread
241 }  // namespace tensorflow
242 
243 #endif  // TENSORFLOW_CORE_PLATFORM_THREADPOOL_H_
244