1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 #define EIGEN_USE_THREADS
16 
17 #include "tensorflow/compiler/xla/service/hlo_runner.h"
18 
19 #include <string>
20 #include <utility>
21 
22 #include "absl/memory/memory.h"
23 #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
24 #include "tensorflow/compiler/xla/layout_util.h"
25 #include "tensorflow/compiler/xla/service/executable.h"
26 #include "tensorflow/compiler/xla/service/hlo_module_group.h"
27 #include "tensorflow/compiler/xla/service/hlo_parser.h"
28 #include "tensorflow/compiler/xla/service/transfer_manager.h"
29 #include "tensorflow/compiler/xla/shape.h"
30 #include "tensorflow/compiler/xla/shape_util.h"
31 #include "tensorflow/core/lib/core/blocking_counter.h"
32 #include "tensorflow/core/platform/logging.h"
33 #include "tensorflow/core/platform/types.h"
34 
35 namespace xla {
36 
HloRunner(se::Platform * platform,int intra_op_parallelism_threads)37 HloRunner::HloRunner(se::Platform* platform, int intra_op_parallelism_threads) {
38   BackendOptions backend_options;
39   backend_options.set_platform(platform);
40   backend_options.set_intra_op_parallelism_threads(
41       intra_op_parallelism_threads);
42   backend_ = Backend::CreateBackend(backend_options).ConsumeValueOrDie();
43   VLOG(1) << "Created HloRunner for platform: " << platform->Name();
44 }
45 
~HloRunner()46 HloRunner::~HloRunner() {}
47 
TransferLiteralToDevice(const Literal & literal)48 StatusOr<ScopedShapedBuffer> HloRunner::TransferLiteralToDevice(
49     const Literal& literal) {
50   TF_ASSIGN_OR_RETURN(ScopedShapedBuffer buffer,
51                       backend().transfer_manager()->AllocateScopedShapedBuffer(
52                           literal.shape(), backend().memory_allocator(),
53                           backend().default_device_ordinal()));
54   TF_ASSIGN_OR_RETURN(
55       auto stream, backend().BorrowStream(backend().default_stream_executor()));
56   TF_RETURN_IF_ERROR(backend().transfer_manager()->TransferLiteralToDevice(
57       stream.get(), literal, buffer));
58   return std::move(buffer);
59 }
60 
TransferLiteralsToDevice(absl::Span<const Literal * const> literals)61 StatusOr<std::vector<ScopedShapedBuffer>> HloRunner::TransferLiteralsToDevice(
62     absl::Span<const Literal* const> literals) {
63   std::vector<ScopedShapedBuffer> buffers;
64   for (const Literal* literal : literals) {
65     CHECK(literal != nullptr);
66     TF_ASSIGN_OR_RETURN(ScopedShapedBuffer buffer,
67                         TransferLiteralToDevice(*literal));
68     buffers.push_back(std::move(buffer));
69   }
70   return std::move(buffers);
71 }
72 
TransferLiteralsToDevice(absl::Span<const Literal> literals)73 StatusOr<std::vector<ScopedShapedBuffer>> HloRunner::TransferLiteralsToDevice(
74     absl::Span<const Literal> literals) {
75   std::vector<const Literal*> literal_pointers;
76   literal_pointers.reserve(literals.size());
77   for (const auto& literal : literals) {
78     literal_pointers.push_back(&literal);
79   }
80   return TransferLiteralsToDevice(literal_pointers);
81 }
82 
TransferLiteralFromDevice(const ShapedBuffer & buffer)83 StatusOr<Literal> HloRunner::TransferLiteralFromDevice(
84     const ShapedBuffer& buffer) {
85   TF_ASSIGN_OR_RETURN(
86       auto stream, backend().BorrowStream(backend().default_stream_executor()));
87   return backend().transfer_manager()->TransferLiteralFromDevice(stream.get(),
88                                                                  buffer);
89 }
90 
Execute(std::unique_ptr<HloModule> module,absl::Span<const Literal * const> arguments,bool run_hlo_passes,ExecutionProfile * profile)91 StatusOr<Literal> HloRunner::Execute(std::unique_ptr<HloModule> module,
92                                      absl::Span<const Literal* const> arguments,
93                                      bool run_hlo_passes,
94                                      ExecutionProfile* profile) {
95   TF_ASSIGN_OR_RETURN(std::vector<ScopedShapedBuffer> argument_buffers,
96                       TransferLiteralsToDevice(arguments));
97   TF_ASSIGN_OR_RETURN(ExecutionOutput result,
98                       ExecuteWithDeviceBuffers(
99                           /*module=*/std::move(module),
100                           /*arguments=*/argument_buffers,
101                           /*run_hlo_passes=*/run_hlo_passes,
102                           /*profile=*/profile));
103   return TransferLiteralFromDevice(result.Result());
104 }
105 
ExecuteWithExecutable(std::unique_ptr<Executable> executable,absl::Span<const Literal * const> arguments,ExecutionProfile * profile)106 StatusOr<Literal> HloRunner::ExecuteWithExecutable(
107     std::unique_ptr<Executable> executable,
108     absl::Span<const Literal* const> arguments, ExecutionProfile* profile) {
109   TF_ASSIGN_OR_RETURN(std::vector<ScopedShapedBuffer> argument_buffers,
110                       TransferLiteralsToDevice(arguments));
111   TF_ASSIGN_OR_RETURN(ExecutionOutput result,
112                       ExecuteWithDeviceBuffers(
113                           /*executable=*/executable.get(),
114                           /*arguments=*/argument_buffers,
115                           /*profile=*/profile));
116   return TransferLiteralFromDevice(result.Result());
117 }
118 
119 // Convert the owning buffer of inputs into a (partially) owning vector of
120 // ExecutionInputs, and an owning vector of `OwningDeviceMemory`'s.
ExecutionInputsFromScopedShapedBuffers(absl::Span<ScopedShapedBuffer const> inputs,HloInputOutputAliasConfig alias_config,int device_ordinal,se::DeviceMemoryAllocator * allocator)121 static std::vector<ExecutionInput> ExecutionInputsFromScopedShapedBuffers(
122     absl::Span<ScopedShapedBuffer const> inputs,
123     HloInputOutputAliasConfig alias_config, int device_ordinal,
124     se::DeviceMemoryAllocator* allocator) {
125   std::vector<ExecutionInput> execution_inputs;
126   std::vector<se::OwningDeviceMemory> owned_args;
127 
128   for (int param_num = 0; param_num < inputs.size(); param_num++) {
129     const ScopedShapedBuffer& input_buffer = inputs[param_num];
130     ShapeTree<MaybeOwningDeviceMemory> buffer_tree(
131         input_buffer.on_device_shape());
132 
133     input_buffer.buffers().ForEachElement(
134         [&](const ShapeIndex& index,
135             const se::DeviceMemoryBase& execution_input_buffer) {
136           if (alias_config.ParameterHasAlias(param_num, index)) {
137             // Store owned.
138             *buffer_tree.mutable_element(index) = se::OwningDeviceMemory{
139                 execution_input_buffer, device_ordinal, allocator};
140           } else {
141             // Store unowned.
142             *buffer_tree.mutable_element(index) = execution_input_buffer;
143           }
144         });
145     execution_inputs.emplace_back(std::move(buffer_tree));
146   }
147   return execution_inputs;
148 }
149 
ExecuteWithDeviceBuffers(std::unique_ptr<HloModule> module,absl::Span<ScopedShapedBuffer const> arguments,bool run_hlo_passes,ExecutionProfile * profile)150 StatusOr<ExecutionOutput> HloRunner::ExecuteWithDeviceBuffers(
151     std::unique_ptr<HloModule> module,
152     absl::Span<ScopedShapedBuffer const> arguments, bool run_hlo_passes,
153     ExecutionProfile* profile) {
154   TF_ASSIGN_OR_RETURN(std::unique_ptr<Executable> executable,
155                       CreateExecutable(std::move(module), run_hlo_passes));
156   return ExecuteWithDeviceBuffers(executable.get(), arguments, profile);
157 }
158 
ExecuteWithDeviceBuffers(Executable * executable,absl::Span<ScopedShapedBuffer const> arguments,ExecutionProfile * profile)159 StatusOr<ExecutionOutput> HloRunner::ExecuteWithDeviceBuffers(
160     Executable* executable, absl::Span<ScopedShapedBuffer const> arguments,
161     ExecutionProfile* profile) {
162   // Get service run options.
163   se::Stream stream(backend().default_stream_executor());
164   stream.Init();
165   ServiceExecutableRunOptions service_run_options =
166       GetServiceRunOptionsForDevice(backend().default_device_ordinal(), &stream,
167                                     nullptr, RunId());
168   service_run_options.mutable_run_options()->set_execution_profile(profile);
169 
170   std::vector<ExecutionInput> execution_arguments =
171       ExecutionInputsFromScopedShapedBuffers(
172           arguments, executable->module().input_output_alias_config(),
173           stream.parent()->device_ordinal(), stream.parent()->GetAllocator());
174 
175   TF_ASSIGN_OR_RETURN(
176       ExecutionOutput retval,
177       executable->ExecuteOnStreamWrapper(&service_run_options,
178                                          std::move(execution_arguments)));
179   TF_RETURN_IF_ERROR(stream.BlockHostUntilDone());
180   return std::move(retval);
181 }
182 
ExecuteReplicated(std::unique_ptr<HloModule> module,const ReplicatedExecuteOptions & options,DeviceAssignment * device_assignment)183 StatusOr<std::vector<Literal>> HloRunner::ExecuteReplicated(
184     std::unique_ptr<HloModule> module, const ReplicatedExecuteOptions& options,
185     DeviceAssignment* device_assignment) {
186   TF_ASSIGN_OR_RETURN(
187       std::unique_ptr<Executable> executable,
188       CreateExecutable(std::move(module), options.run_hlo_passes));
189   return ExecuteReplicated(executable.get(), options, device_assignment);
190 }
191 
ExecuteReplicatedImpl(std::function<StatusOr<std::vector<ScopedShapedBuffer>> (const std::vector<ServiceExecutableRunOptions> &,const std::vector<absl::Span<const ShapedBuffer * const>> &)> execution_helper,std::function<int64 (int64)> argument_count_provider,std::function<const Literal * (int64,int64)> argument_provider,const ReplicatedExecuteOptions & options,DeviceAssignment * device_assignment)192 StatusOr<std::vector<Literal>> HloRunner::ExecuteReplicatedImpl(
193     std::function<StatusOr<std::vector<ScopedShapedBuffer>>(
194         const std::vector<ServiceExecutableRunOptions>&,
195         const std::vector<absl::Span<const ShapedBuffer* const>>&)>
196         execution_helper,
197     std::function<int64(int64)> argument_count_provider,
198     std::function<const Literal*(int64, int64)> argument_provider,
199     const ReplicatedExecuteOptions& options,
200     DeviceAssignment* device_assignment) {
201   std::vector<std::unique_ptr<se::Stream>> streams;
202   std::vector<ServiceExecutableRunOptions> service_run_options;
203 
204   std::vector<ScopedShapedBuffer> argument_buffers;
205   // This reserve() call is necessary for correctness, because
206   // argument_buffer_ptrs contains pointers into the elements of
207   // argument_buffers.
208   const int64 total_argument_count = [&]() {
209     int64 total = 0;
210     for (int64 i = 0; i < options.num_replicas; ++i) {
211       total += argument_count_provider(i);
212     }
213     return total;
214   }();
215   argument_buffers.reserve(total_argument_count);
216 
217   // Plus one so we can safely get &argument_buffer_ptrs[0] in case there are
218   // no arguments.
219   std::vector<const ShapedBuffer*> argument_buffer_ptrs(total_argument_count +
220                                                         1);
221   std::vector<absl::Span<const ShapedBuffer* const>> argument_buffer_slices;
222   int64 index = 0;
223   RunId run_id;
224   for (int64 i = 0; i < options.num_replicas; ++i) {
225     int64 device = (*device_assignment)(i, 0);
226     TF_ASSIGN_OR_RETURN(se::StreamExecutor * executor,
227                         backend().stream_executor(device));
228     streams.push_back(absl::make_unique<se::Stream>(executor));
229     streams.back()->Init();
230     service_run_options.emplace_back(GetServiceRunOptionsForDevice(
231         device, streams.back().get(), device_assignment, run_id));
232 
233     // Copy arguments to device.
234     const int64 argument_count = argument_count_provider(i);
235     for (int64 arg_index = 0; arg_index < argument_count; arg_index++) {
236       const Literal* const argument = argument_provider(i, arg_index);
237       TF_RET_CHECK(argument != nullptr);
238       TF_ASSIGN_OR_RETURN(
239           ScopedShapedBuffer argument_buffer,
240           backend().transfer_manager()->AllocateScopedShapedBuffer(
241               argument->shape(), backend().memory_allocator(), device));
242       TF_RETURN_IF_ERROR(backend().transfer_manager()->TransferLiteralToDevice(
243           streams.back().get(), *argument, argument_buffer));
244       argument_buffers.push_back(std::move(argument_buffer));
245       argument_buffer_ptrs[index++] = &argument_buffers.back();
246     }
247     argument_buffer_slices.emplace_back(
248         &argument_buffer_ptrs[index - argument_count], argument_count);
249   }
250 
251   std::unique_ptr<tensorflow::thread::ThreadPool> pool;
252   int64 num_threads = (options.infeed != nullptr) ? options.num_replicas : 0;
253   if (ShapeUtil::IsInitialized(options.outfeed_shape)) {
254     num_threads += options.num_replicas;
255   }
256   if (num_threads > 0) {
257     pool = absl::make_unique<tensorflow::thread::ThreadPool>(
258         tensorflow::Env::Default(), "infeed_outfeed",
259         /*num_threads=*/num_threads);
260   }
261   if (options.infeed != nullptr) {
262     for (int64 i = 0; i < options.num_replicas; ++i) {
263       int64 device = (*device_assignment)(i, 0);
264       pool->Schedule([this, device, &options]() {
265         se::StreamExecutor* executor =
266             backend().stream_executor(device).ValueOrDie();
267         VLOG(1) << "Starting infeed on device " << device;
268         for (int64 step = 1;
269              options.infeed_steps < 0 || step <= options.infeed_steps; ++step) {
270           TF_CHECK_OK(backend().transfer_manager()->TransferLiteralToInfeed(
271               executor, *options.infeed));
272           if (step % 100 == 0) {
273             VLOG(1) << "Infeed step " << step;
274           }
275         }
276       });
277     }
278   }
279   if (ShapeUtil::IsInitialized(options.outfeed_shape)) {
280     for (int64 i = 0; i < options.num_replicas; ++i) {
281       int64 device = (*device_assignment)(i, 0);
282       pool->Schedule([this, device, &options]() {
283         se::StreamExecutor* executor =
284             backend().stream_executor(device).ValueOrDie();
285         VLOG(1) << "Starting outfeed on device " << device;
286         for (int64 step = 1;
287              options.infeed_steps < 0 || step <= options.infeed_steps; ++step) {
288           Literal literal(options.outfeed_shape);
289           TF_CHECK_OK(backend().transfer_manager()->TransferLiteralFromOutfeed(
290               executor, &literal));
291           if (options.outfeed_values != nullptr) {
292             options.outfeed_values->push_back(std::move(literal));
293           }
294           if (step % 100 == 0) {
295             VLOG(1) << "Outfeed step " << step;
296           }
297         }
298       });
299     }
300   }
301 
302   LOG(INFO) << "Replicated execution started";
303   TF_ASSIGN_OR_RETURN(
304       std::vector<ScopedShapedBuffer> results,
305       execution_helper(service_run_options, argument_buffer_slices));
306   LOG(INFO) << "Replicated execution terminated";
307 
308   std::vector<Literal> exec_results;
309   for (int64 i = 0; i < options.num_replicas; ++i) {
310     TF_RETURN_IF_ERROR(streams[i]->BlockHostUntilDone());
311     TF_ASSIGN_OR_RETURN(Literal literal,
312                         backend().transfer_manager()->TransferLiteralFromDevice(
313                             streams[i].get(), results[i]));
314     exec_results.push_back(std::move(literal));
315   }
316   return std::move(exec_results);
317 }
318 
ExecuteReplicated(Executable * executable,const ReplicatedExecuteOptions & options,DeviceAssignment * device_assignment,ExecutionProfile * profile)319 StatusOr<std::vector<Literal>> HloRunner::ExecuteReplicated(
320     Executable* executable, const ReplicatedExecuteOptions& options,
321     DeviceAssignment* device_assignment, ExecutionProfile* profile) {
322   return ExecuteReplicatedImpl(
323       [&](const std::vector<ServiceExecutableRunOptions>& service_run_options,
324           const std::vector<absl::Span<const ShapedBuffer* const>>&
325               argument_buffer_slices)
326           -> StatusOr<std::vector<ScopedShapedBuffer>> {
327         std::vector<ScopedShapedBuffer> results;
328         if (!options.use_threads) {
329           TF_ASSIGN_OR_RETURN(
330               results, executable->ExecuteOnStreams(service_run_options,
331                                                     argument_buffer_slices));
332         } else {
333           tensorflow::mutex mutex;
334           std::vector<StatusOr<ScopedShapedBuffer>> thread_results(
335               options.num_replicas);
336           {
337             LOG(INFO) << "Creating thread pool for " << options.num_replicas
338                       << " replicas";
339             tensorflow::thread::ThreadPool pool(
340                 tensorflow::Env::Default(), "replicas", options.num_replicas);
341             for (int64 i = 0; i < options.num_replicas; ++i) {
342               pool.Schedule([&, i] {
343                 auto result = executable->ExecuteOnStream(
344                     &service_run_options[i], argument_buffer_slices[i],
345                     nullptr);
346                 tensorflow::mutex_lock lock(mutex);
347                 thread_results[i] = std::move(result);
348               });
349             }
350 
351             // Note: the thread pool destructor guarantees it completes all work
352             // before we leave this scope.
353           }
354           for (auto& thread_result : thread_results) {
355             if (!thread_result.ok()) {
356               return thread_result.status();
357             }
358             results.push_back(std::move(thread_result).ValueOrDie());
359           }
360         }
361         return results;
362       },
363       [&](int64 replica) { return options.arguments.size(); },
364       [&](int64 replica, int64 index) { return options.arguments[index]; },
365       options, device_assignment);
366 }
367 
ExecuteReplicated(std::function<Executable * (int64)> executable_provider,std::function<int64 (int64)> argument_count_provider,std::function<const Literal * (int64,int64)> argument_provider,const ReplicatedExecuteOptions & options)368 StatusOr<std::vector<Literal>> HloRunner::ExecuteReplicated(
369     std::function<Executable*(int64)> executable_provider,
370     std::function<int64(int64)> argument_count_provider,
371     std::function<const Literal*(int64, int64)> argument_provider,
372     const ReplicatedExecuteOptions& options) {
373   TF_ASSIGN_OR_RETURN(
374       DeviceAssignment device_assignment,
375       backend().computation_placer()->AssignDevices(options.num_replicas, 1));
376   return ExecuteReplicatedImpl(
377       [&](const std::vector<ServiceExecutableRunOptions>& service_run_options,
378           const std::vector<absl::Span<const ShapedBuffer* const>>&
379               argument_buffer_slices)
380           -> StatusOr<std::vector<ScopedShapedBuffer>> {
381         TF_RET_CHECK(options.use_threads);
382         std::vector<ScopedShapedBuffer> results;
383         tensorflow::mutex mutex;
384         std::vector<StatusOr<ScopedShapedBuffer>> thread_results(
385             options.num_replicas);
386         {
387           LOG(INFO) << "Creating thread pool for " << options.num_replicas
388                     << " replicas";
389           tensorflow::thread::ThreadPool pool(tensorflow::Env::Default(),
390                                               "replicas", options.num_replicas);
391           for (int64 i = 0; i < options.num_replicas; ++i) {
392             for (const auto& arg : argument_buffer_slices[i]) {
393               TF_RET_CHECK(arg != nullptr);
394             }
395             pool.Schedule([&, i] {
396               auto result = executable_provider(i)->ExecuteOnStream(
397                   &service_run_options[i], argument_buffer_slices[i], nullptr);
398               tensorflow::mutex_lock lock(mutex);
399               thread_results[i] = std::move(result);
400             });
401           }
402 
403           // Note: the thread pool destructor guarantees it completes all work
404           // before we leave this scope.
405         }
406         for (auto& thread_result : thread_results) {
407           if (!thread_result.ok()) {
408             return thread_result.status();
409           }
410           results.push_back(std::move(thread_result).ValueOrDie());
411         }
412         return results;
413       },
414       argument_count_provider, argument_provider, options, &device_assignment);
415 }
416 
ExecuteReplicated(std::unique_ptr<HloModule> module,const ReplicatedExecuteOptions & options)417 StatusOr<std::vector<Literal>> HloRunner::ExecuteReplicated(
418     std::unique_ptr<HloModule> module,
419     const ReplicatedExecuteOptions& options) {
420   TF_ASSIGN_OR_RETURN(
421       DeviceAssignment device_assignment,
422       backend().computation_placer()->AssignDevices(options.num_replicas, 1));
423   return ExecuteReplicated(std::move(module), options, &device_assignment);
424 }
425 
CreateExecutable(std::unique_ptr<HloModule> module,bool run_hlo_passes)426 StatusOr<std::unique_ptr<Executable>> HloRunner::CreateExecutable(
427     std::unique_ptr<HloModule> module, bool run_hlo_passes) {
428   if (run_hlo_passes) {
429     auto module_group = absl::make_unique<HloModuleGroup>(std::move(module));
430     TF_ASSIGN_OR_RETURN(
431         auto executables,
432         backend().compiler()->Compile(std::move(module_group),
433                                       {{backend().default_stream_executor()}},
434                                       backend().memory_allocator()));
435     return std::move(executables[0]);
436   }
437   return backend().compiler()->RunBackend(std::move(module),
438                                           backend().default_stream_executor(),
439                                           backend().memory_allocator());
440 }
441 
GetServiceRunOptionsForDevice(int64 device,se::Stream * stream,DeviceAssignment * device_assignment,RunId run_id)442 ServiceExecutableRunOptions HloRunner::GetServiceRunOptionsForDevice(
443     int64 device, se::Stream* stream, DeviceAssignment* device_assignment,
444     RunId run_id) {
445   ExecutableRunOptions run_options;
446   run_options.set_device_ordinal(device);
447   run_options.set_stream(stream);
448   run_options.set_allocator(backend().memory_allocator());
449   run_options.set_intra_op_thread_pool(
450       backend().eigen_intra_op_thread_pool_device());
451   if (device_assignment != nullptr) {
452     run_options.set_device_assignment(device_assignment);
453   }
454   run_options.set_run_id(run_id);
455   return ServiceExecutableRunOptions(run_options, backend().StreamBorrower());
456 }
457 
backend()458 Backend& HloRunner::backend() {
459   if (!backend_) {
460     backend_ = Backend::CreateDefaultBackend().ConsumeValueOrDie();
461     VLOG(1) << "Executing on platform " << backend().platform()->Name();
462   }
463   return *backend_;
464 }
465 
backend() const466 const Backend& HloRunner::backend() const {
467   return const_cast<HloRunner*>(this)->backend();
468 }
469 
470 }  // namespace xla
471