1 /* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/platform/default/unbounded_work_queue.h"
17 
18 #include "absl/memory/memory.h"
19 #include "tensorflow/core/platform/env.h"
20 #include "tensorflow/core/platform/mutex.h"
21 #include "tensorflow/core/platform/numa.h"
22 
23 namespace tensorflow {
24 
UnboundedWorkQueue(Env * env,const string & thread_name,const ThreadOptions & thread_options)25 UnboundedWorkQueue::UnboundedWorkQueue(Env* env, const string& thread_name,
26                                        const ThreadOptions& thread_options)
27     : env_(env), thread_name_(thread_name), thread_options_(thread_options) {}
28 
~UnboundedWorkQueue()29 UnboundedWorkQueue::~UnboundedWorkQueue() {
30   {
31     mutex_lock l(work_queue_mu_);
32     // Wake up all `PooledThreadFunc` threads and cause them to terminate before
33     // joining them when `threads_` is cleared.
34     cancelled_ = true;
35     work_queue_cv_.notify_all();
36     if (!work_queue_.empty()) {
37       LOG(ERROR) << "UnboundedWorkQueue named \"" << thread_name_ << "\" was "
38                  << "deleted with pending work in its queue. This may indicate "
39                  << "a potential use-after-free bug.";
40     }
41   }
42 
43   {
44     mutex_lock l(thread_pool_mu_);
45     // Clear the list of pooled threads, which will eventually terminate due to
46     // the previous notification.
47     //
48     // NOTE: It is safe to do this while holding `thread_pool_mu_`, because
49     // no subsequent calls to `this->Schedule()` should be issued after the
50     // destructor starts.
51     thread_pool_.clear();
52   }
53 }
54 
Schedule(WorkFunction fn)55 void UnboundedWorkQueue::Schedule(WorkFunction fn) {
56   // Enqueue a work item for the new thread's function, and wake up a
57   // cached thread to process it.
58   mutex_lock l(work_queue_mu_);
59   work_queue_.push_back(std::move(fn));
60   work_queue_cv_.notify_one();
61   // NOTE: The queue may be non-empty, so we must account for queued work when
62   // considering how many threads are free.
63   if (work_queue_.size() > num_idle_threads_) {
64     // Spawn a new physical thread to process the given function.
65     // NOTE: `PooledThreadFunc` will eventually increment `num_idle_threads_`
66     // at the beginning of its work loop.
67     Thread* new_thread =
68         env_->StartThread({}, thread_name_, [this]() { PooledThreadFunc(); });
69 
70     mutex_lock l(thread_pool_mu_);
71     thread_pool_.emplace_back(new_thread);
72   }
73 }
74 
PooledThreadFunc()75 void UnboundedWorkQueue::PooledThreadFunc() {
76   // If specified, make sure the thread runs on the correct NUMA node.
77   if (thread_options_.numa_node != port::kNUMANoAffinity) {
78     port::NUMASetThreadNodeAffinity(thread_options_.numa_node);
79   }
80 
81   while (true) {
82     WorkFunction fn;
83     {
84       mutex_lock l(work_queue_mu_);
85       ++num_idle_threads_;
86       while (!cancelled_ && work_queue_.empty()) {
87         // Wait for a new work function to be submitted, or the cache to be
88         // destroyed.
89         work_queue_cv_.wait(l);
90       }
91       if (cancelled_) {
92         return;
93       }
94       fn = std::move(work_queue_.front());
95       work_queue_.pop_front();
96       --num_idle_threads_;
97     }
98 
99     fn();
100   }
101 }
102 
103 }  // namespace tensorflow
104