1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #define EIGEN_USE_THREADS
17 
18 #include "tensorflow/compiler/xla/service/backend.h"
19 
20 #include <algorithm>
21 #include <string>
22 #include <utility>
23 
24 #include "absl/memory/memory.h"
25 #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
26 #include "tensorflow/compiler/xla/service/compiler.h"
27 #include "tensorflow/compiler/xla/service/platform_util.h"
28 #include "tensorflow/compiler/xla/status_macros.h"
29 #include "tensorflow/compiler/xla/statusor.h"
30 #include "tensorflow/compiler/xla/types.h"
31 #include "tensorflow/compiler/xla/util.h"
32 #include "tensorflow/core/lib/core/errors.h"
33 #include "tensorflow/core/lib/core/threadpool.h"
34 #include "tensorflow/core/platform/byte_order.h"
35 #include "tensorflow/core/platform/cpu_info.h"
36 #include "tensorflow/core/platform/env.h"
37 #include "tensorflow/core/platform/logging.h"
38 #include "tensorflow/core/platform/stream_executor_no_cuda.h"
39 
40 namespace xla {
41 
set_platform(se::Platform * platform)42 BackendOptions& BackendOptions::set_platform(se::Platform* platform) {
43   platform_ = platform;
44   return *this;
45 }
46 
platform() const47 se::Platform* BackendOptions::platform() const { return platform_; }
48 
set_intra_op_parallelism_threads(int num_threads)49 BackendOptions& BackendOptions::set_intra_op_parallelism_threads(
50     int num_threads) {
51   intra_op_parallelism_threads_ = num_threads;
52   return *this;
53 }
54 
intra_op_parallelism_threads() const55 int BackendOptions::intra_op_parallelism_threads() const {
56   return intra_op_parallelism_threads_;
57 }
58 
set_allowed_devices(const absl::optional<std::set<int>> & allowed_devices)59 BackendOptions& BackendOptions::set_allowed_devices(
60     const absl::optional<std::set<int>>& allowed_devices) {
61   allowed_devices_ = allowed_devices;
62   return *this;
63 }
64 
allowed_devices() const65 const absl::optional<std::set<int>>& BackendOptions::allowed_devices() const {
66   return allowed_devices_;
67 }
68 
69 namespace {
70 
71 class EigenThreadPoolWrapper : public Eigen::ThreadPoolInterface {
72  public:
EigenThreadPoolWrapper(tensorflow::thread::ThreadPool * pool)73   explicit EigenThreadPoolWrapper(tensorflow::thread::ThreadPool* pool)
74       : pool_(pool) {}
~EigenThreadPoolWrapper()75   ~EigenThreadPoolWrapper() override {}
76 
Schedule(std::function<void ()> fn)77   void Schedule(std::function<void()> fn) override {
78     pool_->Schedule(std::move(fn));
79   }
NumThreads() const80   int NumThreads() const override { return pool_->NumThreads(); }
CurrentThreadId() const81   int CurrentThreadId() const override { return pool_->CurrentThreadId(); }
82 
83  private:
84   tensorflow::thread::ThreadPool* pool_ = nullptr;
85 };
86 
87 }  // namespace
88 
89 // Define this in .cc file to avoid having to include eigen or forward declare
90 // these types in the header.
91 struct Backend::IntraOpThreadPool {
IntraOpThreadPoolxla::Backend::IntraOpThreadPool92   explicit IntraOpThreadPool(const int num_threads)
93       : pool(new tensorflow::thread::ThreadPool(tensorflow::Env::Default(),
94                                                 "XLAEigen", num_threads)),
95         wrapper(new EigenThreadPoolWrapper(pool.get())),
96         device(new Eigen::ThreadPoolDevice(wrapper.get(),
97                                            wrapper->NumThreads())) {}
98 
99   std::unique_ptr<tensorflow::thread::ThreadPool> pool;
100   std::unique_ptr<EigenThreadPoolWrapper> wrapper;
101   std::unique_ptr<Eigen::ThreadPoolDevice> device;
102 };
103 
CreateBackend(const BackendOptions & options)104 /* static */ StatusOr<std::unique_ptr<Backend>> Backend::CreateBackend(
105     const BackendOptions& options) {
106   se::Platform* platform = options.platform();
107   TF_ASSIGN_OR_RETURN(auto compiler, Compiler::GetForPlatform(platform));
108   TF_ASSIGN_OR_RETURN(
109       auto stream_executors,
110       PlatformUtil::GetStreamExecutors(platform, options.allowed_devices()));
111   TF_ASSIGN_OR_RETURN(auto transfer_manager,
112                       TransferManager::GetForPlatform(platform));
113   TF_ASSIGN_OR_RETURN(auto computation_placer,
114                       ComputationPlacer::GetForPlatform(platform));
115   std::unique_ptr<Backend> backend(
116       new Backend(platform, compiler, stream_executors, transfer_manager,
117                   computation_placer, options.intra_op_parallelism_threads()));
118   return std::move(backend);
119 }
120 
121 /* static */ StatusOr<std::unique_ptr<Backend>>
CreateDefaultBackend()122 Backend::CreateDefaultBackend() {
123   TF_ASSIGN_OR_RETURN(se::Platform * platform,
124                       PlatformUtil::GetDefaultPlatform());
125   BackendOptions backend_options;
126   backend_options.set_platform(platform);
127   return CreateBackend(backend_options);
128 }
129 
BorrowStream(int device_ordinal)130 StatusOr<StreamPool::Ptr> Backend::BorrowStream(int device_ordinal) {
131   TF_ASSIGN_OR_RETURN(auto executor, stream_executor(device_ordinal));
132   return BorrowStream(executor);
133 }
134 
BorrowStream(se::StreamExecutor * executor)135 StatusOr<StreamPool::Ptr> Backend::BorrowStream(se::StreamExecutor* executor) {
136   tensorflow::mutex_lock l(mu_);
137   if (!stream_pools_.contains(executor)) {
138     stream_pools_.emplace(executor, absl::make_unique<StreamPool>());
139   }
140   return stream_pools_.at(executor)->BorrowStream(executor);
141 }
142 
Backend(se::Platform * platform,Compiler * compiler,absl::Span<se::StreamExecutor * const> stream_executors,TransferManager * transfer_manager,ComputationPlacer * computation_placer,int intra_op_parallelism_threads)143 Backend::Backend(se::Platform* platform, Compiler* compiler,
144                  absl::Span<se::StreamExecutor* const> stream_executors,
145                  TransferManager* transfer_manager,
146                  ComputationPlacer* computation_placer,
147                  int intra_op_parallelism_threads)
148     : platform_(platform),
149       compiler_(compiler),
150       transfer_manager_(transfer_manager),
151       computation_placer_(computation_placer) {
152   // The given set of stream executors set may include invalid executors.
153   for (se::StreamExecutor* exec : stream_executors) {
154     if (exec != nullptr) {
155       stream_executors_.push_back(exec);
156     }
157   }
158   // Create a memory allocator for the valid stream executors.
159   memory_allocator_ = absl::make_unique<StreamExecutorMemoryAllocator>(
160       platform, stream_executors);
161   CHECK(!stream_executors_.empty())
162       << "Service found no devices for backend " << platform_->Name() << '.';
163 
164   if (platform->id() == se::host::kHostPlatformId) {
165     const int num_threads = intra_op_parallelism_threads > 0
166                                 ? intra_op_parallelism_threads
167                                 : tensorflow::port::NumSchedulableCPUs();
168     intra_op_thread_pool_.reset(new IntraOpThreadPool(num_threads));
169   }
170 }
171 
~Backend()172 Backend::~Backend() {}
173 
default_device_ordinal() const174 int Backend::default_device_ordinal() const {
175   return default_stream_executor()->device_ordinal();
176 }
177 
eigen_intra_op_thread_pool_device() const178 const Eigen::ThreadPoolDevice* Backend::eigen_intra_op_thread_pool_device()
179     const {
180   if (intra_op_thread_pool_ == nullptr) {
181     return nullptr;
182   }
183   return intra_op_thread_pool_->device.get();
184 }
185 
eigen_intra_op_thread_pool() const186 tensorflow::thread::ThreadPool* Backend::eigen_intra_op_thread_pool() const {
187   if (intra_op_thread_pool_ == nullptr) {
188     return nullptr;
189   }
190   return intra_op_thread_pool_->pool.get();
191 }
192 
stream_executor(int device_ordinal) const193 StatusOr<se::StreamExecutor*> Backend::stream_executor(
194     int device_ordinal) const {
195   if (device_ordinal < 0 ||
196       device_ordinal > stream_executors_.back()->device_ordinal()) {
197     return InvalidArgument(
198         "Invalid device ordinal value (%d). Valid range is [0, %d].",
199         device_ordinal, stream_executors_.back()->device_ordinal());
200   }
201   for (auto* executor : stream_executors_) {
202     if (executor->device_ordinal() == device_ordinal) {
203       return executor;
204     }
205   }
206   return InvalidArgument("device %s not supported by XLA service",
207                          device_name(device_ordinal));
208 }
209 
devices_equivalent(int device_ordinal_a,int device_ordinal_b)210 StatusOr<bool> Backend::devices_equivalent(int device_ordinal_a,
211                                            int device_ordinal_b) {
212   // Use the name from device description to determine equivalence. This is a
213   // bit crude but works for GPUs which is the important case where we compile
214   // an executable for one GPU and want to know if it will run (well) on
215   // another.
216   TF_ASSIGN_OR_RETURN(se::StreamExecutor * executor_a,
217                       stream_executor(device_ordinal_a));
218   TF_ASSIGN_OR_RETURN(se::StreamExecutor * executor_b,
219                       stream_executor(device_ordinal_b));
220   return (executor_a->GetDeviceDescription().name() ==
221           executor_b->GetDeviceDescription().name());
222 }
223 
ResetDevices()224 Status Backend::ResetDevices() {
225   return transfer_manager_->ResetDevices(stream_executors_);
226 }
227 
228 }  // namespace xla
229