1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/common_runtime/copy_tensor.h"
17 
18 #include <atomic>
19 #include <utility>
20 #include <vector>
21 #include "tensorflow/core/common_runtime/dma_helper.h"
22 #include "tensorflow/core/framework/variant_op_registry.h"
23 #include "tensorflow/core/lib/core/errors.h"
24 #include "tensorflow/core/lib/core/refcount.h"
25 #include "tensorflow/core/platform/logging.h"
26 #include "tensorflow/core/platform/tracing.h"
27 #include "tensorflow/core/util/reffed_status_callback.h"
28 
29 namespace tensorflow {
30 namespace {
31 
32 struct RegistrationInfo {
RegistrationInfotensorflow::__anon11a90ad10111::RegistrationInfo33   RegistrationInfo(DeviceType s, DeviceType r, CopyTensor::CopyFunction cf)
34       : sender_device_type(std::move(s)),
35         receiver_device_type(std::move(r)),
36         copy_function(cf) {}
37   DeviceType sender_device_type;
38   DeviceType receiver_device_type;
39   CopyTensor::CopyFunction copy_function;
40 };
41 
42 // We use a vector instead of a map since we expect there to be very
43 // few registrations.
MutableRegistry()44 std::vector<RegistrationInfo>* MutableRegistry() {
45   static std::vector<RegistrationInfo>* registry =
46       new std::vector<RegistrationInfo>;
47   return registry;
48 }
49 
CopyHostToDevice(const Tensor * input,Allocator * cpu_allocator,Allocator * out_allocator,StringPiece edge_name,Device * dst,Tensor * output,DeviceContext * recv_dev_context,StatusCallback done)50 void CopyHostToDevice(const Tensor* input, Allocator* cpu_allocator,
51                       Allocator* out_allocator, StringPiece edge_name,
52                       Device* dst, Tensor* output,
53                       DeviceContext* recv_dev_context, StatusCallback done) {
54   if (input->dtype() == DT_VARIANT) {
55     Tensor copy(cpu_allocator, DT_VARIANT, input->shape());
56     auto* status_cb = new ReffedStatusCallback(std::move(done));
57     core::ScopedUnref status_cb_unref(status_cb);
58 
59     auto wrapped_done = [status_cb](const Status& s) {
60       status_cb->UpdateStatus(s);
61       status_cb->Unref();
62     };
63     auto copier = std::bind(
64         [dst, recv_dev_context, out_allocator, status_cb](
65             StatusCallback wrapped_done_,
66             // Begin unbound arguments
67             const Tensor& from, Tensor* to) {
68           if (!DMAHelper::CanUseDMA(&from)) {
69             Status err = errors::InvalidArgument(
70                 "During Variant Host->Device Copy: "
71                 "non-DMA-copy attempted of tensor type: ",
72                 DataTypeString(from.dtype()));
73             status_cb->UpdateStatus(err);
74             return err;
75           }
76           if (status_cb->ok()) {
77             status_cb->Ref();
78             *to = Tensor(out_allocator, from.dtype(), from.shape());
79             recv_dev_context->CopyCPUTensorToDevice(&from, dst, to,
80                                                     wrapped_done_);
81             return Status::OK();
82           } else {
83             return status_cb->status();
84           }
85         },
86         std::move(wrapped_done), std::placeholders::_1, std::placeholders::_2);
87 
88     const Variant* v = input->flat<Variant>().data();
89     Variant* v_out = copy.flat<Variant>().data();
90     Status s_copy_init;
91     for (int64 i = 0; i < input->NumElements(); ++i) {
92       s_copy_init = VariantDeviceCopy(
93           VariantDeviceCopyDirection::HOST_TO_DEVICE, v[i], &v_out[i], copier);
94       if (!s_copy_init.ok()) {
95         status_cb->UpdateStatus(s_copy_init);
96         break;
97       }
98     }
99     if (s_copy_init.ok()) {
100       *output = std::move(copy);
101     }
102   } else {
103     recv_dev_context->CopyCPUTensorToDevice(input, dst, output,
104                                             std::move(done));
105   }
106 }
107 
CopyDeviceToHost(const Tensor * input,Allocator * cpu_allocator,Allocator * out_allocator,StringPiece edge_name,Device * src,Tensor * output,DeviceContext * send_dev_context,StatusCallback done)108 void CopyDeviceToHost(const Tensor* input, Allocator* cpu_allocator,
109                       Allocator* out_allocator, StringPiece edge_name,
110                       Device* src, Tensor* output,
111                       DeviceContext* send_dev_context, StatusCallback done) {
112   if (input->dtype() == DT_VARIANT) {
113     Tensor copy(cpu_allocator, DT_VARIANT, input->shape());
114     auto* status_cb = new ReffedStatusCallback(std::move(done));
115     core::ScopedUnref status_cb_unref(status_cb);
116 
117     auto wrapped_done = [status_cb](const Status& s) {
118       status_cb->UpdateStatus(s);
119       status_cb->Unref();
120     };
121     auto copier = std::bind(
122         [edge_name, src, send_dev_context, out_allocator, status_cb](
123             StatusCallback wrapped_done_,
124             // Begin unbound arguments
125             const Tensor& from, Tensor* to) {
126           if (!DMAHelper::CanUseDMA(&from)) {
127             Status err = errors::InvalidArgument(
128                 "During Variant Device->Host Copy: "
129                 "non-DMA-copy attempted of tensor type: ",
130                 DataTypeString(from.dtype()));
131             status_cb->UpdateStatus(err);
132             return err;
133           }
134           if (status_cb->ok()) {
135             status_cb->Ref();
136             *to = Tensor(out_allocator, from.dtype(), from.shape());
137             send_dev_context->CopyDeviceTensorToCPU(&from, edge_name, src, to,
138                                                     wrapped_done_);
139             return Status::OK();
140           } else {
141             return status_cb->status();
142           }
143         },
144         std::move(wrapped_done), std::placeholders::_1, std::placeholders::_2);
145 
146     const Variant* v = input->flat<Variant>().data();
147     Variant* v_out = copy.flat<Variant>().data();
148     Status s_copy_init;
149     for (int64 i = 0; i < input->NumElements(); ++i) {
150       s_copy_init = VariantDeviceCopy(
151           VariantDeviceCopyDirection::DEVICE_TO_HOST, v[i], &v_out[i], copier);
152       if (!s_copy_init.ok()) {
153         status_cb->UpdateStatus(s_copy_init);
154         break;
155       }
156     }
157     if (s_copy_init.ok()) {
158       *output = std::move(copy);
159     }
160   } else {
161     send_dev_context->CopyDeviceTensorToCPU(input, edge_name, src, output,
162                                             std::move(done));
163   }
164 }
165 
CopyDeviceToDevice(CopyTensor::CopyFunction copy_function,Allocator * cpu_allocator,Allocator * out_allocator,DeviceContext * send_dev_context,DeviceContext * recv_dev_context,Device * src,Device * dst,const AllocatorAttributes src_alloc_attr,const AllocatorAttributes dst_alloc_attr,const Tensor * input,Tensor * output,StatusCallback done)166 void CopyDeviceToDevice(CopyTensor::CopyFunction copy_function,
167                         Allocator* cpu_allocator, Allocator* out_allocator,
168                         DeviceContext* send_dev_context,
169                         DeviceContext* recv_dev_context, Device* src,
170                         Device* dst, const AllocatorAttributes src_alloc_attr,
171                         const AllocatorAttributes dst_alloc_attr,
172                         const Tensor* input, Tensor* output,
173                         StatusCallback done) {
174   if (input->dtype() == DT_VARIANT) {
175     Tensor copy(cpu_allocator, DT_VARIANT, input->shape());
176     auto* status_cb = new ReffedStatusCallback(std::move(done));
177     core::ScopedUnref status_cb_unref(status_cb);
178 
179     auto wrapped_done = [status_cb](const Status& s) {
180       status_cb->UpdateStatus(s);
181       status_cb->Unref();
182     };
183     auto copier = std::bind(
184         [copy_function, src, dst, src_alloc_attr, dst_alloc_attr,
185          recv_dev_context, send_dev_context, out_allocator,
186          status_cb](StatusCallback wrapped_done_,
187                     // Begin unbound arguments
188                     const Tensor& from, Tensor* to) {
189           if (!DMAHelper::CanUseDMA(&from)) {
190             Status err = errors::InvalidArgument(
191                 "During Variant Device->Device Copy: "
192                 "non-DMA-copy attempted of tensor type: ",
193                 DataTypeString(from.dtype()));
194             status_cb->UpdateStatus(err);
195             return err;
196           }
197           if (status_cb->ok()) {
198             status_cb->Ref();
199             *to = Tensor(out_allocator, from.dtype(), from.shape());
200             copy_function(send_dev_context, recv_dev_context, src, dst,
201                           src_alloc_attr, dst_alloc_attr, &from, to,
202                           std::move(wrapped_done_));
203             return Status::OK();
204           } else {
205             return status_cb->status();
206           }
207         },
208         std::move(wrapped_done), std::placeholders::_1, std::placeholders::_2);
209 
210     const Variant* v = input->flat<Variant>().data();
211     Variant* v_out = copy.flat<Variant>().data();
212     Status s_copy_init;
213     for (int64 i = 0; i < input->NumElements(); ++i) {
214       s_copy_init =
215           VariantDeviceCopy(VariantDeviceCopyDirection::DEVICE_TO_DEVICE, v[i],
216                             &v_out[i], copier);
217       if (!s_copy_init.ok()) {
218         status_cb->UpdateStatus(s_copy_init);
219         break;
220       }
221     }
222     if (s_copy_init.ok()) {
223       *output = std::move(copy);
224     }
225   } else {
226     copy_function(send_dev_context, recv_dev_context, src, dst, src_alloc_attr,
227                   dst_alloc_attr, input, output, std::move(done));
228   }
229 }
230 
231 }  // namespace
232 
233 // static
ViaDMA(StringPiece edge_name,DeviceContext * send_dev_context,DeviceContext * recv_dev_context,Device * src,Device * dst,const AllocatorAttributes src_alloc_attr,const AllocatorAttributes dst_alloc_attr,const Tensor * input,Tensor * output,StatusCallback done)234 void CopyTensor::ViaDMA(StringPiece edge_name, DeviceContext* send_dev_context,
235                         DeviceContext* recv_dev_context, Device* src,
236                         Device* dst, const AllocatorAttributes src_alloc_attr,
237                         const AllocatorAttributes dst_alloc_attr,
238                         const Tensor* input, Tensor* output,
239                         StatusCallback done) {
240   port::Tracing::ScopedAnnotation annotation(edge_name);
241   VLOG(1) << "Copy " << edge_name;
242 
243   const DeviceType src_device_type(
244       src_alloc_attr.on_host() ? DEVICE_CPU : src->attributes().device_type());
245   const DeviceType dst_device_type(
246       dst_alloc_attr.on_host() ? DEVICE_CPU : dst->attributes().device_type());
247   const bool non_cpu_src = src_device_type != DeviceType(DEVICE_CPU);
248   const bool non_cpu_dst = dst_device_type != DeviceType(DEVICE_CPU);
249 
250   // TODO(phawkins): choose an allocator optimal for both the src and dst
251   // devices, not just the src device.
252   AllocatorAttributes host_alloc_attrs;
253   host_alloc_attrs.set_gpu_compatible(true);
254   host_alloc_attrs.set_on_host(true);
255   Allocator* cpu_allocator = src->GetAllocator(host_alloc_attrs);
256   Allocator* out_allocator = dst->GetAllocator(dst_alloc_attr);
257 
258   // E.g., gpu -> gpu
259   if (non_cpu_src && non_cpu_dst) {
260     // Device to device copy.  Look through registry for an appropriate
261     // CopyFunction.
262     std::vector<RegistrationInfo>* registry = MutableRegistry();
263     for (const RegistrationInfo& ri : *registry) {
264       if (ri.sender_device_type == src_device_type &&
265           ri.receiver_device_type == dst_device_type) {
266         CopyDeviceToDevice(ri.copy_function, cpu_allocator, out_allocator,
267                            send_dev_context, recv_dev_context, src, dst,
268                            src_alloc_attr, dst_alloc_attr, input, output,
269                            std::move(done));
270         return;
271       }
272     }
273 
274     // Fall back to copying via the host.
275     VLOG(1) << "No function registered to copy from devices of type "
276             << src_device_type.type() << " to devices of type "
277             << dst_device_type.type()
278             << ". Falling back to copying via the host.";
279 
280     Tensor* cpu_tensor =
281         new Tensor(cpu_allocator, input->dtype(), input->shape());
282     std::function<void(const Status&)> delete_and_done = std::bind(
283         [cpu_tensor](StatusCallback done_,
284                      // Begin unbound arguments.
285                      const Status& status) {
286           delete cpu_tensor;
287           done_(status);
288         },
289         std::move(done), std::placeholders::_1);
290     std::function<void(const Status&)> then_copy_to_other_device = std::bind(
291         [delete_and_done, recv_dev_context, cpu_tensor, cpu_allocator,
292          out_allocator, edge_name, dst, output](StatusCallback delete_and_done_,
293                                                 // Begin unbound arguments.
294                                                 Status status) {
295           if (!status.ok()) {
296             delete_and_done_(status);
297             return;
298           }
299           CopyHostToDevice(cpu_tensor, cpu_allocator, out_allocator, edge_name,
300                            dst, output, recv_dev_context,
301                            std::move(delete_and_done_));
302         },
303         std::move(delete_and_done), std::placeholders::_1);
304     CopyDeviceToHost(input, cpu_allocator, out_allocator, edge_name, src,
305                      cpu_tensor, send_dev_context,
306                      std::move(then_copy_to_other_device));
307     return;
308   }
309 
310   // E.g., gpu -> cpu
311   if (non_cpu_src && !non_cpu_dst) {
312     // Device to host copy.
313     CopyDeviceToHost(input, cpu_allocator, out_allocator, edge_name, src,
314                      output, send_dev_context, std::move(done));
315     return;
316   }
317 
318   // E.g., cpu -> gpu
319   if (!non_cpu_src && non_cpu_dst) {
320     // Host to Device copy.
321     CopyHostToDevice(input, cpu_allocator, out_allocator, edge_name, dst,
322                      output, recv_dev_context, std::move(done));
323     return;
324   }
325 
326   // cpu -> cpu
327   CHECK(!non_cpu_src && !non_cpu_dst);
328   *output = *input;
329   done(Status::OK());
330 }
331 
332 // static
Register(DeviceType sender_device_type,DeviceType receiver_device_type,CopyFunction copy_function)333 Status CopyTensor::Register(DeviceType sender_device_type,
334                             DeviceType receiver_device_type,
335                             CopyFunction copy_function) {
336   std::vector<RegistrationInfo>* registry = MutableRegistry();
337   registry->emplace_back(sender_device_type, receiver_device_type,
338                          copy_function);
339   return Status::OK();
340 }
341 
342 }  // namespace tensorflow
343