1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/common_runtime/copy_tensor.h"
17 
18 #include <atomic>
19 #include <utility>
20 #include <vector>
21 
22 #include "tensorflow/core/common_runtime/dma_helper.h"
23 #include "tensorflow/core/framework/variant_op_registry.h"
24 #include "tensorflow/core/lib/core/errors.h"
25 #include "tensorflow/core/lib/core/refcount.h"
26 #include "tensorflow/core/platform/logging.h"
27 #include "tensorflow/core/profiler/lib/scoped_annotation.h"
28 #include "tensorflow/core/util/reffed_status_callback.h"
29 
30 namespace tensorflow {
31 namespace {
32 
33 struct RegistrationInfo {
RegistrationInfotensorflow::__anon11a90ad10111::RegistrationInfo34   RegistrationInfo(DeviceType s, DeviceType r, CopyTensor::CopyFunction cf)
35       : sender_device_type(std::move(s)),
36         receiver_device_type(std::move(r)),
37         copy_function(cf) {}
38   DeviceType sender_device_type;
39   DeviceType receiver_device_type;
40   CopyTensor::CopyFunction copy_function;
41 };
42 
43 // We use a vector instead of a map since we expect there to be very
44 // few registrations.
MutableRegistry()45 std::vector<RegistrationInfo>* MutableRegistry() {
46   static std::vector<RegistrationInfo>* registry =
47       new std::vector<RegistrationInfo>;
48   return registry;
49 }
50 
CopyHostToDevice(const Tensor * input,Allocator * cpu_allocator,Allocator * out_allocator,StringPiece edge_name,Device * dst,Tensor * output,DeviceContext * recv_dev_context,StatusCallback done,bool sync_dst_compute)51 void CopyHostToDevice(const Tensor* input, Allocator* cpu_allocator,
52                       Allocator* out_allocator, StringPiece edge_name,
53                       Device* dst, Tensor* output,
54                       DeviceContext* recv_dev_context, StatusCallback done,
55                       bool sync_dst_compute) {
56   if (input->dtype() == DT_VARIANT) {
57     Tensor copy(cpu_allocator, DT_VARIANT, input->shape());
58     auto* status_cb = new ReffedStatusCallback(std::move(done));
59     core::ScopedUnref status_cb_unref(status_cb);
60 
61     auto wrapped_done = [status_cb](const Status& s) {
62       status_cb->UpdateStatus(s);
63       status_cb->Unref();
64     };
65     auto copier =
66         [dst, recv_dev_context, out_allocator, status_cb, cpu_allocator,
67          edge_name, sync_dst_compute, wrapped_done = std::move(wrapped_done)](
68             const Tensor& from, Tensor* to) {
69           if (from.dtype() == DT_VARIANT) {
70             status_cb->Ref();
71             CopyHostToDevice(&from, cpu_allocator, out_allocator, edge_name,
72                              dst, to, recv_dev_context, wrapped_done,
73                              sync_dst_compute);
74             return Status::OK();
75           } else {
76             if (!DMAHelper::CanUseDMA(&from)) {
77               Status err = errors::InvalidArgument(
78                   "During Variant Host->Device Copy: "
79                   "non-DMA-copy attempted of tensor type: ",
80                   DataTypeString(from.dtype()));
81               status_cb->UpdateStatus(err);
82               return err;
83             }
84             if (status_cb->ok()) {
85               status_cb->Ref();
86               *to = Tensor(out_allocator, from.dtype(), from.shape());
87               recv_dev_context->CopyCPUTensorToDevice(
88                   &from, dst, to, wrapped_done, sync_dst_compute);
89               return Status::OK();
90             } else {
91               return status_cb->status();
92             }
93           }
94         };
95 
96     const Variant* v = input->flat<Variant>().data();
97     Variant* v_out = copy.flat<Variant>().data();
98     Status s_copy_init;
99     for (int64 i = 0; i < input->NumElements(); ++i) {
100       s_copy_init = VariantDeviceCopy(
101           VariantDeviceCopyDirection::HOST_TO_DEVICE, v[i], &v_out[i], copier);
102       if (!s_copy_init.ok()) {
103         status_cb->UpdateStatus(s_copy_init);
104         break;
105       }
106     }
107     if (s_copy_init.ok()) {
108       *output = std::move(copy);
109     }
110   } else if (input->dtype() == DT_RESOURCE) {
111     *output = *input;
112     done(Status::OK());
113   } else {
114     recv_dev_context->CopyCPUTensorToDevice(input, dst, output, std::move(done),
115                                             sync_dst_compute);
116   }
117 }
118 
119 
CopyDeviceToDevice(CopyTensor::CopyFunction copy_function,Allocator * cpu_allocator,Allocator * out_allocator,DeviceContext * send_dev_context,DeviceContext * recv_dev_context,Device * src,Device * dst,const AllocatorAttributes src_alloc_attr,const AllocatorAttributes dst_alloc_attr,const Tensor * input,Tensor * output,int dev_to_dev_stream_index,StatusCallback done)120 void CopyDeviceToDevice(CopyTensor::CopyFunction copy_function,
121                         Allocator* cpu_allocator, Allocator* out_allocator,
122                         DeviceContext* send_dev_context,
123                         DeviceContext* recv_dev_context, Device* src,
124                         Device* dst, const AllocatorAttributes src_alloc_attr,
125                         const AllocatorAttributes dst_alloc_attr,
126                         const Tensor* input, Tensor* output,
127                         int dev_to_dev_stream_index, StatusCallback done) {
128   if (input->dtype() == DT_VARIANT) {
129     Tensor copy(cpu_allocator, DT_VARIANT, input->shape());
130     auto* status_cb = new ReffedStatusCallback(std::move(done));
131     core::ScopedUnref status_cb_unref(status_cb);
132 
133     auto wrapped_done = [status_cb](const Status& s) {
134       status_cb->UpdateStatus(s);
135       status_cb->Unref();
136     };
137     auto copier =
138         [copy_function, cpu_allocator, src, dst, src_alloc_attr, dst_alloc_attr,
139          recv_dev_context, send_dev_context, out_allocator, status_cb,
140          dev_to_dev_stream_index, wrapped_done = std::move(wrapped_done)](
141             // Begin unbound arguments
142             const Tensor& from, Tensor* to) {
143           if (from.dtype() == DT_VARIANT) {
144             status_cb->Ref();
145             CopyDeviceToDevice(copy_function, cpu_allocator, out_allocator,
146                                send_dev_context, recv_dev_context, src, dst,
147                                src_alloc_attr, dst_alloc_attr, &from, to,
148                                dev_to_dev_stream_index, wrapped_done);
149             return Status::OK();
150           } else {
151             if (!DMAHelper::CanUseDMA(&from)) {
152               Status err = errors::InvalidArgument(
153                   "During Variant Device->Device Copy: ", src->name(), " to ",
154                   dst->name(), " non-DMA-copy attempted of tensor type: ",
155                   DataTypeString(from.dtype()));
156               status_cb->UpdateStatus(err);
157               return err;
158             }
159             if (status_cb->ok()) {
160               status_cb->Ref();
161               *to = Tensor(out_allocator, from.dtype(), from.shape());
162               copy_function(send_dev_context, recv_dev_context, src, dst,
163                             src_alloc_attr, dst_alloc_attr, &from, to,
164                             dev_to_dev_stream_index, wrapped_done);
165               return Status::OK();
166             } else {
167               return status_cb->status();
168             }
169           }
170         };
171 
172     const Variant* v = input->flat<Variant>().data();
173     Variant* v_out = copy.flat<Variant>().data();
174     Status s_copy_init;
175     for (int64 i = 0; i < input->NumElements(); ++i) {
176       s_copy_init =
177           VariantDeviceCopy(VariantDeviceCopyDirection::DEVICE_TO_DEVICE, v[i],
178                             &v_out[i], copier);
179       if (!s_copy_init.ok()) {
180         status_cb->UpdateStatus(s_copy_init);
181         break;
182       }
183     }
184     if (s_copy_init.ok()) {
185       *output = std::move(copy);
186     }
187   } else if (input->dtype() == DT_RESOURCE) {
188     *output = *input;
189     done(Status::OK());
190   } else {
191     copy_function(send_dev_context, recv_dev_context, src, dst, src_alloc_attr,
192                   dst_alloc_attr, input, output, dev_to_dev_stream_index,
193                   std::move(done));
194   }
195 }
196 
197 }  // namespace
198 
199 // static
ViaDMA(StringPiece edge_name,DeviceContext * send_dev_context,DeviceContext * recv_dev_context,Device * src,Device * dst,const AllocatorAttributes src_alloc_attr,const AllocatorAttributes dst_alloc_attr,const Tensor * input,Tensor * output,int dev_to_dev_stream_index,StatusCallback done,bool sync_dst_compute)200 void CopyTensor::ViaDMA(StringPiece edge_name, DeviceContext* send_dev_context,
201                         DeviceContext* recv_dev_context, Device* src,
202                         Device* dst, const AllocatorAttributes src_alloc_attr,
203                         const AllocatorAttributes dst_alloc_attr,
204                         const Tensor* input, Tensor* output,
205                         int dev_to_dev_stream_index, StatusCallback done,
206                         bool sync_dst_compute) {
207   profiler::ScopedAnnotation annotation(
208       [&] { return absl::StrCat("#edge_name=", edge_name, "#"); });
209   VLOG(1) << "Copy " << edge_name;
210 
211   const DeviceType src_device_type(
212       src_alloc_attr.on_host() ? DEVICE_CPU : src->attributes().device_type());
213   const DeviceType dst_device_type(
214       dst_alloc_attr.on_host() ? DEVICE_CPU : dst->attributes().device_type());
215   const bool non_cpu_src = src_device_type != DeviceType(DEVICE_CPU);
216   const bool non_cpu_dst = dst_device_type != DeviceType(DEVICE_CPU);
217 
218   // TODO(phawkins): choose an allocator optimal for both the src and dst
219   // devices, not just the src device.
220   AllocatorAttributes host_alloc_attrs;
221   host_alloc_attrs.set_gpu_compatible(true);
222   host_alloc_attrs.set_on_host(true);
223   Allocator* cpu_allocator = src->GetAllocator(host_alloc_attrs);
224   Allocator* out_allocator = dst->GetAllocator(dst_alloc_attr);
225 
226   // E.g., gpu -> gpu
227   if (non_cpu_src && non_cpu_dst) {
228     // Device to device copy.  Look through registry for an appropriate
229     // CopyFunction.
230     std::vector<RegistrationInfo>* registry = MutableRegistry();
231     for (const RegistrationInfo& ri : *registry) {
232       if (ri.sender_device_type == src_device_type &&
233           ri.receiver_device_type == dst_device_type) {
234         CopyDeviceToDevice(ri.copy_function, cpu_allocator, out_allocator,
235                            send_dev_context, recv_dev_context, src, dst,
236                            src_alloc_attr, dst_alloc_attr, input, output,
237                            dev_to_dev_stream_index, std::move(done));
238         return;
239       }
240     }
241 
242     // Fall back to copying via the host.
243     VLOG(1) << "No function registered to copy from devices of type "
244             << src_device_type.type() << " to devices of type "
245             << dst_device_type.type()
246             << ". Falling back to copying via the host.";
247 
248     Tensor* cpu_tensor =
249         new Tensor(cpu_allocator, input->dtype(), input->shape());
250     auto delete_and_done = [cpu_tensor,
251                             done = std::move(done)](const Status& status) {
252       delete cpu_tensor;
253       done(status);
254     };
255     auto then_copy_to_other_device =
256         [delete_and_done = std::move(delete_and_done), recv_dev_context,
257          cpu_tensor, cpu_allocator, out_allocator, edge_name, dst, output,
258          sync_dst_compute](Status status) {
259           if (!status.ok()) {
260             delete_and_done(status);
261             return;
262           }
263           CopyHostToDevice(cpu_tensor, cpu_allocator, out_allocator, edge_name,
264                            dst, output, recv_dev_context,
265                            std::move(delete_and_done), sync_dst_compute);
266         };
267     CopyDeviceToHost(input, cpu_allocator, out_allocator, edge_name, src,
268                      cpu_tensor, send_dev_context,
269                      std::move(then_copy_to_other_device));
270     return;
271   }
272 
273   // E.g., gpu -> cpu
274   if (non_cpu_src && !non_cpu_dst) {
275     // Device to host copy.
276     CopyDeviceToHost(input, cpu_allocator, out_allocator, edge_name, src,
277                      output, send_dev_context, std::move(done));
278     return;
279   }
280 
281   // E.g., cpu -> gpu
282   if (!non_cpu_src && non_cpu_dst) {
283     // Host to Device copy.
284     CopyHostToDevice(input, cpu_allocator, out_allocator, edge_name, dst,
285                      output, recv_dev_context, std::move(done),
286                      sync_dst_compute);
287     return;
288   }
289 
290   // cpu -> cpu
291   CHECK(!non_cpu_src && !non_cpu_dst);
292   *output = *input;
293   done(Status::OK());
294 }
295 
296 // static
Register(DeviceType sender_device_type,DeviceType receiver_device_type,CopyFunction copy_function)297 Status CopyTensor::Register(DeviceType sender_device_type,
298                             DeviceType receiver_device_type,
299                             CopyFunction copy_function) {
300   std::vector<RegistrationInfo>* registry = MutableRegistry();
301   registry->emplace_back(sender_device_type, receiver_device_type,
302                          copy_function);
303   return Status::OK();
304 }
305 
306 namespace {
307 
308 // The following registrations enable a DT_VARIANT tensor element that contains
309 // a wrapped `tensorflow::Tensor` to be copied between devices.
WrappedTensorDeviceCopy(const Tensor & from,Tensor * to,const UnaryVariantOpRegistry::AsyncTensorDeviceCopyFn & copy)310 static Status WrappedTensorDeviceCopy(
311     const Tensor& from, Tensor* to,
312     const UnaryVariantOpRegistry::AsyncTensorDeviceCopyFn& copy) {
313   if (DMAHelper::CanUseDMA(&from)) {
314     TF_RETURN_IF_ERROR(copy(from, to));
315   } else {
316     *to = from;
317   }
318 
319   return Status::OK();
320 }
321 
322 #define REGISTER_WRAPPED_TENSOR_COPY(DIRECTION)         \
323   INTERNAL_REGISTER_UNARY_VARIANT_DEVICE_COPY_FUNCTION( \
324       Tensor, DIRECTION, WrappedTensorDeviceCopy)
325 
326 REGISTER_WRAPPED_TENSOR_COPY(VariantDeviceCopyDirection::HOST_TO_DEVICE);
327 REGISTER_WRAPPED_TENSOR_COPY(VariantDeviceCopyDirection::DEVICE_TO_HOST);
328 REGISTER_WRAPPED_TENSOR_COPY(VariantDeviceCopyDirection::DEVICE_TO_DEVICE);
329 
330 }  // namespace
331 
CopyDeviceToHost(const Tensor * input,Allocator * cpu_allocator,Allocator * out_allocator,StringPiece edge_name,Device * src,Tensor * output,DeviceContext * send_dev_context,StatusCallback done)332 void CopyDeviceToHost(const Tensor* input, Allocator* cpu_allocator,
333                       Allocator* out_allocator, StringPiece edge_name,
334                       Device* src, Tensor* output,
335                       DeviceContext* send_dev_context, StatusCallback done) {
336   if (input->dtype() == DT_VARIANT) {
337     Tensor copy(cpu_allocator, DT_VARIANT, input->shape());
338     auto* status_cb = new ReffedStatusCallback(std::move(done));
339     core::ScopedUnref status_cb_unref(status_cb);
340 
341     auto wrapped_done = [status_cb](const Status& s) {
342       status_cb->UpdateStatus(s);
343       status_cb->Unref();
344     };
345     auto copier =
346         [edge_name, src, send_dev_context, out_allocator, status_cb,
347          cpu_allocator, wrapped_done = std::move(wrapped_done)](
348             const Tensor& from, Tensor* to) {
349           if (from.dtype() == DT_VARIANT) {
350             status_cb->Ref();
351             CopyDeviceToHost(&from, cpu_allocator, out_allocator, edge_name,
352                              src, to, send_dev_context, wrapped_done);
353             return Status::OK();
354           } else {
355             if (!DMAHelper::CanUseDMA(&from)) {
356               Status err = errors::InvalidArgument(
357                   "During Variant Device->Host Copy: "
358                   "non-DMA-copy attempted of tensor type: ",
359                   DataTypeString(from.dtype()));
360               status_cb->UpdateStatus(err);
361               return err;
362             }
363             if (status_cb->ok()) {
364               status_cb->Ref();
365               *to = Tensor(out_allocator, from.dtype(), from.shape());
366               send_dev_context->CopyDeviceTensorToCPU(&from, edge_name, src, to,
367                                                       wrapped_done);
368               return Status::OK();
369             } else {
370               return status_cb->status();
371             }
372           }
373         };
374 
375     const Variant* v = input->flat<Variant>().data();
376     Variant* v_out = copy.flat<Variant>().data();
377     Status s_copy_init;
378     for (int64 i = 0; i < input->NumElements(); ++i) {
379       s_copy_init = VariantDeviceCopy(
380           VariantDeviceCopyDirection::DEVICE_TO_HOST, v[i], &v_out[i], copier);
381       if (!s_copy_init.ok()) {
382         status_cb->UpdateStatus(s_copy_init);
383         break;
384       }
385     }
386     if (s_copy_init.ok()) {
387       *output = std::move(copy);
388     }
389   } else if (input->dtype() == DT_RESOURCE) {
390     *output = *input;
391     done(Status::OK());
392   } else {
393     send_dev_context->CopyDeviceTensorToCPU(input, edge_name, src, output,
394                                             std::move(done));
395   }
396 }
397 
398 }  // namespace tensorflow
399