1 /* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 #ifndef TENSORFLOW_CORE_PLATFORM_DEFAULT_UNBOUNDED_WORK_QUEUE_H_
16 #define TENSORFLOW_CORE_PLATFORM_DEFAULT_UNBOUNDED_WORK_QUEUE_H_
17 
18 #include <deque>
19 #include <memory>
20 #include <vector>
21 
22 #include "tensorflow/core/lib/core/notification.h"
23 #include "tensorflow/core/platform/env.h"
24 #include "tensorflow/core/platform/mutex.h"
25 
26 namespace tensorflow {
27 
28 // An `UnboundedWorkQueue` provides a mechanism for temporally multiplexing a
29 // potentially large number of "logical" threads onto a smaller number of
30 // "physical" threads. The multiplexing is achieved by maintaining an internal
31 // pool of long-running "physical" threads that are used to execute the
32 // "logical" threads.  Like a regular thread, a "logical" thread may block on
33 // other threads, and the size of the pool will increase to ensure that progress
34 // is made. This mechanism is recommended in situations where short-lived
35 // threads are created repeatedly, to avoid the overhead and memory
36 // fragmentation that can result from excessive thread creation.
37 class UnboundedWorkQueue {
38  public:
39   UnboundedWorkQueue(Env* env, const string& thread_name,
40                      const ThreadOptions& thread_options = {});
41   ~UnboundedWorkQueue();
42 
43   using WorkFunction = std::function<void()>;
44 
45   // Schedule `fn` on a thread.  `fn` may perform blocking work, so if all the
46   // existing threads are blocked or busy, this may spawn a new thread which
47   // will be added to the thread pool managed by this work queue.
48   void Schedule(WorkFunction fn);
49 
50  private:
51   void PooledThreadFunc();
52 
53   Env* const env_;  // Not owned.
54   const string thread_name_;
55   const ThreadOptions thread_options_;
56   mutex work_queue_mu_;
57   condition_variable work_queue_cv_ TF_GUARDED_BY(work_queue_mu_);
58   size_t num_idle_threads_ TF_GUARDED_BY(work_queue_mu_) = 0;
59   bool cancelled_ TF_GUARDED_BY(work_queue_mu_) = false;
60   std::deque<WorkFunction> work_queue_ TF_GUARDED_BY(work_queue_mu_);
61   mutex thread_pool_mu_;
62   std::vector<std::unique_ptr<Thread>> thread_pool_
63       TF_GUARDED_BY(thread_pool_mu_);
64 };
65 
66 }  // namespace tensorflow
67 
68 #endif  // TENSORFLOW_CORE_PLATFORM_DEFAULT_UNBOUNDED_WORK_QUEUE_H_
69