1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/common_runtime/copy_tensor.h"
17
18 #include <atomic>
19 #include <utility>
20 #include <vector>
21 #include "tensorflow/core/common_runtime/dma_helper.h"
22 #include "tensorflow/core/framework/variant_op_registry.h"
23 #include "tensorflow/core/lib/core/errors.h"
24 #include "tensorflow/core/lib/core/refcount.h"
25 #include "tensorflow/core/platform/logging.h"
26 #include "tensorflow/core/platform/tracing.h"
27 #include "tensorflow/core/util/reffed_status_callback.h"
28
29 namespace tensorflow {
30 namespace {
31
32 struct RegistrationInfo {
RegistrationInfotensorflow::__anon11a90ad10111::RegistrationInfo33 RegistrationInfo(DeviceType s, DeviceType r, CopyTensor::CopyFunction cf)
34 : sender_device_type(std::move(s)),
35 receiver_device_type(std::move(r)),
36 copy_function(cf) {}
37 DeviceType sender_device_type;
38 DeviceType receiver_device_type;
39 CopyTensor::CopyFunction copy_function;
40 };
41
42 // We use a vector instead of a map since we expect there to be very
43 // few registrations.
MutableRegistry()44 std::vector<RegistrationInfo>* MutableRegistry() {
45 static std::vector<RegistrationInfo>* registry =
46 new std::vector<RegistrationInfo>;
47 return registry;
48 }
49
CopyHostToDevice(const Tensor * input,Allocator * cpu_allocator,Allocator * out_allocator,StringPiece edge_name,Device * dst,Tensor * output,DeviceContext * recv_dev_context,StatusCallback done)50 void CopyHostToDevice(const Tensor* input, Allocator* cpu_allocator,
51 Allocator* out_allocator, StringPiece edge_name,
52 Device* dst, Tensor* output,
53 DeviceContext* recv_dev_context, StatusCallback done) {
54 if (input->dtype() == DT_VARIANT) {
55 Tensor copy(cpu_allocator, DT_VARIANT, input->shape());
56 auto* status_cb = new ReffedStatusCallback(std::move(done));
57 core::ScopedUnref status_cb_unref(status_cb);
58
59 auto wrapped_done = [status_cb](const Status& s) {
60 status_cb->UpdateStatus(s);
61 status_cb->Unref();
62 };
63 auto copier = std::bind(
64 [dst, recv_dev_context, out_allocator, status_cb](
65 StatusCallback wrapped_done_,
66 // Begin unbound arguments
67 const Tensor& from, Tensor* to) {
68 if (!DMAHelper::CanUseDMA(&from)) {
69 Status err = errors::InvalidArgument(
70 "During Variant Host->Device Copy: "
71 "non-DMA-copy attempted of tensor type: ",
72 DataTypeString(from.dtype()));
73 status_cb->UpdateStatus(err);
74 return err;
75 }
76 if (status_cb->ok()) {
77 status_cb->Ref();
78 *to = Tensor(out_allocator, from.dtype(), from.shape());
79 recv_dev_context->CopyCPUTensorToDevice(&from, dst, to,
80 wrapped_done_);
81 return Status::OK();
82 } else {
83 return status_cb->status();
84 }
85 },
86 std::move(wrapped_done), std::placeholders::_1, std::placeholders::_2);
87
88 const Variant* v = input->flat<Variant>().data();
89 Variant* v_out = copy.flat<Variant>().data();
90 Status s_copy_init;
91 for (int64 i = 0; i < input->NumElements(); ++i) {
92 s_copy_init = VariantDeviceCopy(
93 VariantDeviceCopyDirection::HOST_TO_DEVICE, v[i], &v_out[i], copier);
94 if (!s_copy_init.ok()) {
95 status_cb->UpdateStatus(s_copy_init);
96 break;
97 }
98 }
99 if (s_copy_init.ok()) {
100 *output = std::move(copy);
101 }
102 } else {
103 recv_dev_context->CopyCPUTensorToDevice(input, dst, output,
104 std::move(done));
105 }
106 }
107
CopyDeviceToHost(const Tensor * input,Allocator * cpu_allocator,Allocator * out_allocator,StringPiece edge_name,Device * src,Tensor * output,DeviceContext * send_dev_context,StatusCallback done)108 void CopyDeviceToHost(const Tensor* input, Allocator* cpu_allocator,
109 Allocator* out_allocator, StringPiece edge_name,
110 Device* src, Tensor* output,
111 DeviceContext* send_dev_context, StatusCallback done) {
112 if (input->dtype() == DT_VARIANT) {
113 Tensor copy(cpu_allocator, DT_VARIANT, input->shape());
114 auto* status_cb = new ReffedStatusCallback(std::move(done));
115 core::ScopedUnref status_cb_unref(status_cb);
116
117 auto wrapped_done = [status_cb](const Status& s) {
118 status_cb->UpdateStatus(s);
119 status_cb->Unref();
120 };
121 auto copier = std::bind(
122 [edge_name, src, send_dev_context, out_allocator, status_cb](
123 StatusCallback wrapped_done_,
124 // Begin unbound arguments
125 const Tensor& from, Tensor* to) {
126 if (!DMAHelper::CanUseDMA(&from)) {
127 Status err = errors::InvalidArgument(
128 "During Variant Device->Host Copy: "
129 "non-DMA-copy attempted of tensor type: ",
130 DataTypeString(from.dtype()));
131 status_cb->UpdateStatus(err);
132 return err;
133 }
134 if (status_cb->ok()) {
135 status_cb->Ref();
136 *to = Tensor(out_allocator, from.dtype(), from.shape());
137 send_dev_context->CopyDeviceTensorToCPU(&from, edge_name, src, to,
138 wrapped_done_);
139 return Status::OK();
140 } else {
141 return status_cb->status();
142 }
143 },
144 std::move(wrapped_done), std::placeholders::_1, std::placeholders::_2);
145
146 const Variant* v = input->flat<Variant>().data();
147 Variant* v_out = copy.flat<Variant>().data();
148 Status s_copy_init;
149 for (int64 i = 0; i < input->NumElements(); ++i) {
150 s_copy_init = VariantDeviceCopy(
151 VariantDeviceCopyDirection::DEVICE_TO_HOST, v[i], &v_out[i], copier);
152 if (!s_copy_init.ok()) {
153 status_cb->UpdateStatus(s_copy_init);
154 break;
155 }
156 }
157 if (s_copy_init.ok()) {
158 *output = std::move(copy);
159 }
160 } else {
161 send_dev_context->CopyDeviceTensorToCPU(input, edge_name, src, output,
162 std::move(done));
163 }
164 }
165
CopyDeviceToDevice(CopyTensor::CopyFunction copy_function,Allocator * cpu_allocator,Allocator * out_allocator,DeviceContext * send_dev_context,DeviceContext * recv_dev_context,Device * src,Device * dst,const AllocatorAttributes src_alloc_attr,const AllocatorAttributes dst_alloc_attr,const Tensor * input,Tensor * output,StatusCallback done)166 void CopyDeviceToDevice(CopyTensor::CopyFunction copy_function,
167 Allocator* cpu_allocator, Allocator* out_allocator,
168 DeviceContext* send_dev_context,
169 DeviceContext* recv_dev_context, Device* src,
170 Device* dst, const AllocatorAttributes src_alloc_attr,
171 const AllocatorAttributes dst_alloc_attr,
172 const Tensor* input, Tensor* output,
173 StatusCallback done) {
174 if (input->dtype() == DT_VARIANT) {
175 Tensor copy(cpu_allocator, DT_VARIANT, input->shape());
176 auto* status_cb = new ReffedStatusCallback(std::move(done));
177 core::ScopedUnref status_cb_unref(status_cb);
178
179 auto wrapped_done = [status_cb](const Status& s) {
180 status_cb->UpdateStatus(s);
181 status_cb->Unref();
182 };
183 auto copier = std::bind(
184 [copy_function, src, dst, src_alloc_attr, dst_alloc_attr,
185 recv_dev_context, send_dev_context, out_allocator,
186 status_cb](StatusCallback wrapped_done_,
187 // Begin unbound arguments
188 const Tensor& from, Tensor* to) {
189 if (!DMAHelper::CanUseDMA(&from)) {
190 Status err = errors::InvalidArgument(
191 "During Variant Device->Device Copy: "
192 "non-DMA-copy attempted of tensor type: ",
193 DataTypeString(from.dtype()));
194 status_cb->UpdateStatus(err);
195 return err;
196 }
197 if (status_cb->ok()) {
198 status_cb->Ref();
199 *to = Tensor(out_allocator, from.dtype(), from.shape());
200 copy_function(send_dev_context, recv_dev_context, src, dst,
201 src_alloc_attr, dst_alloc_attr, &from, to,
202 std::move(wrapped_done_));
203 return Status::OK();
204 } else {
205 return status_cb->status();
206 }
207 },
208 std::move(wrapped_done), std::placeholders::_1, std::placeholders::_2);
209
210 const Variant* v = input->flat<Variant>().data();
211 Variant* v_out = copy.flat<Variant>().data();
212 Status s_copy_init;
213 for (int64 i = 0; i < input->NumElements(); ++i) {
214 s_copy_init =
215 VariantDeviceCopy(VariantDeviceCopyDirection::DEVICE_TO_DEVICE, v[i],
216 &v_out[i], copier);
217 if (!s_copy_init.ok()) {
218 status_cb->UpdateStatus(s_copy_init);
219 break;
220 }
221 }
222 if (s_copy_init.ok()) {
223 *output = std::move(copy);
224 }
225 } else {
226 copy_function(send_dev_context, recv_dev_context, src, dst, src_alloc_attr,
227 dst_alloc_attr, input, output, std::move(done));
228 }
229 }
230
231 } // namespace
232
233 // static
ViaDMA(StringPiece edge_name,DeviceContext * send_dev_context,DeviceContext * recv_dev_context,Device * src,Device * dst,const AllocatorAttributes src_alloc_attr,const AllocatorAttributes dst_alloc_attr,const Tensor * input,Tensor * output,StatusCallback done)234 void CopyTensor::ViaDMA(StringPiece edge_name, DeviceContext* send_dev_context,
235 DeviceContext* recv_dev_context, Device* src,
236 Device* dst, const AllocatorAttributes src_alloc_attr,
237 const AllocatorAttributes dst_alloc_attr,
238 const Tensor* input, Tensor* output,
239 StatusCallback done) {
240 port::Tracing::ScopedAnnotation annotation(edge_name);
241 VLOG(1) << "Copy " << edge_name;
242
243 const DeviceType src_device_type(
244 src_alloc_attr.on_host() ? DEVICE_CPU : src->attributes().device_type());
245 const DeviceType dst_device_type(
246 dst_alloc_attr.on_host() ? DEVICE_CPU : dst->attributes().device_type());
247 const bool non_cpu_src = src_device_type != DeviceType(DEVICE_CPU);
248 const bool non_cpu_dst = dst_device_type != DeviceType(DEVICE_CPU);
249
250 // TODO(phawkins): choose an allocator optimal for both the src and dst
251 // devices, not just the src device.
252 AllocatorAttributes host_alloc_attrs;
253 host_alloc_attrs.set_gpu_compatible(true);
254 host_alloc_attrs.set_on_host(true);
255 Allocator* cpu_allocator = src->GetAllocator(host_alloc_attrs);
256 Allocator* out_allocator = dst->GetAllocator(dst_alloc_attr);
257
258 // E.g., gpu -> gpu
259 if (non_cpu_src && non_cpu_dst) {
260 // Device to device copy. Look through registry for an appropriate
261 // CopyFunction.
262 std::vector<RegistrationInfo>* registry = MutableRegistry();
263 for (const RegistrationInfo& ri : *registry) {
264 if (ri.sender_device_type == src_device_type &&
265 ri.receiver_device_type == dst_device_type) {
266 CopyDeviceToDevice(ri.copy_function, cpu_allocator, out_allocator,
267 send_dev_context, recv_dev_context, src, dst,
268 src_alloc_attr, dst_alloc_attr, input, output,
269 std::move(done));
270 return;
271 }
272 }
273
274 // Fall back to copying via the host.
275 VLOG(1) << "No function registered to copy from devices of type "
276 << src_device_type.type() << " to devices of type "
277 << dst_device_type.type()
278 << ". Falling back to copying via the host.";
279
280 Tensor* cpu_tensor =
281 new Tensor(cpu_allocator, input->dtype(), input->shape());
282 std::function<void(const Status&)> delete_and_done = std::bind(
283 [cpu_tensor](StatusCallback done_,
284 // Begin unbound arguments.
285 const Status& status) {
286 delete cpu_tensor;
287 done_(status);
288 },
289 std::move(done), std::placeholders::_1);
290 std::function<void(const Status&)> then_copy_to_other_device = std::bind(
291 [delete_and_done, recv_dev_context, cpu_tensor, cpu_allocator,
292 out_allocator, edge_name, dst, output](StatusCallback delete_and_done_,
293 // Begin unbound arguments.
294 Status status) {
295 if (!status.ok()) {
296 delete_and_done_(status);
297 return;
298 }
299 CopyHostToDevice(cpu_tensor, cpu_allocator, out_allocator, edge_name,
300 dst, output, recv_dev_context,
301 std::move(delete_and_done_));
302 },
303 std::move(delete_and_done), std::placeholders::_1);
304 CopyDeviceToHost(input, cpu_allocator, out_allocator, edge_name, src,
305 cpu_tensor, send_dev_context,
306 std::move(then_copy_to_other_device));
307 return;
308 }
309
310 // E.g., gpu -> cpu
311 if (non_cpu_src && !non_cpu_dst) {
312 // Device to host copy.
313 CopyDeviceToHost(input, cpu_allocator, out_allocator, edge_name, src,
314 output, send_dev_context, std::move(done));
315 return;
316 }
317
318 // E.g., cpu -> gpu
319 if (!non_cpu_src && non_cpu_dst) {
320 // Host to Device copy.
321 CopyHostToDevice(input, cpu_allocator, out_allocator, edge_name, dst,
322 output, recv_dev_context, std::move(done));
323 return;
324 }
325
326 // cpu -> cpu
327 CHECK(!non_cpu_src && !non_cpu_dst);
328 *output = *input;
329 done(Status::OK());
330 }
331
332 // static
Register(DeviceType sender_device_type,DeviceType receiver_device_type,CopyFunction copy_function)333 Status CopyTensor::Register(DeviceType sender_device_type,
334 DeviceType receiver_device_type,
335 CopyFunction copy_function) {
336 std::vector<RegistrationInfo>* registry = MutableRegistry();
337 registry->emplace_back(sender_device_type, receiver_device_type,
338 copy_function);
339 return Status::OK();
340 }
341
342 } // namespace tensorflow
343