1 /* Copyright 2019 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 #ifndef TENSORFLOW_CORE_PLATFORM_DEFAULT_UNBOUNDED_WORK_QUEUE_H_ 16 #define TENSORFLOW_CORE_PLATFORM_DEFAULT_UNBOUNDED_WORK_QUEUE_H_ 17 18 #include <deque> 19 #include <memory> 20 #include <vector> 21 22 #include "tensorflow/core/lib/core/notification.h" 23 #include "tensorflow/core/platform/env.h" 24 #include "tensorflow/core/platform/mutex.h" 25 26 namespace tensorflow { 27 28 // An `UnboundedWorkQueue` provides a mechanism for temporally multiplexing a 29 // potentially large number of "logical" threads onto a smaller number of 30 // "physical" threads. The multiplexing is achieved by maintaining an internal 31 // pool of long-running "physical" threads that are used to execute the 32 // "logical" threads. Like a regular thread, a "logical" thread may block on 33 // other threads, and the size of the pool will increase to ensure that progress 34 // is made. This mechanism is recommended in situations where short-lived 35 // threads are created repeatedly, to avoid the overhead and memory 36 // fragmentation that can result from excessive thread creation. 37 class UnboundedWorkQueue { 38 public: 39 UnboundedWorkQueue(Env* env, const string& thread_name, 40 const ThreadOptions& thread_options = {}); 41 ~UnboundedWorkQueue(); 42 43 using WorkFunction = std::function<void()>; 44 45 // Schedule `fn` on a thread. `fn` may perform blocking work, so if all the 46 // existing threads are blocked or busy, this may spawn a new thread which 47 // will be added to the thread pool managed by this work queue. 48 void Schedule(WorkFunction fn); 49 50 private: 51 void PooledThreadFunc(); 52 53 Env* const env_; // Not owned. 54 const string thread_name_; 55 const ThreadOptions thread_options_; 56 mutex work_queue_mu_; 57 condition_variable work_queue_cv_ TF_GUARDED_BY(work_queue_mu_); 58 size_t num_idle_threads_ TF_GUARDED_BY(work_queue_mu_) = 0; 59 bool cancelled_ TF_GUARDED_BY(work_queue_mu_) = false; 60 std::deque<WorkFunction> work_queue_ TF_GUARDED_BY(work_queue_mu_); 61 mutex thread_pool_mu_; 62 std::vector<std::unique_ptr<Thread>> thread_pool_ 63 TF_GUARDED_BY(thread_pool_mu_); 64 }; 65 66 } // namespace tensorflow 67 68 #endif // TENSORFLOW_CORE_PLATFORM_DEFAULT_UNBOUNDED_WORK_QUEUE_H_ 69