1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/common_runtime/graph_view.h"
17 
18 #include <atomic>
19 #include <deque>
20 #include <memory>
21 #include <string>
22 #include <unordered_map>
23 #include <vector>
24 
25 #include "tensorflow/core/common_runtime/device.h"
26 #include "tensorflow/core/framework/node_def_util.h"
27 #include "tensorflow/core/framework/op_kernel.h"
28 #include "tensorflow/core/framework/tensor.h"
29 #include "tensorflow/core/graph/edgeset.h"
30 #include "tensorflow/core/graph/graph.h"
31 #include "tensorflow/core/lib/core/errors.h"
32 #include "tensorflow/core/lib/gtl/inlined_vector.h"
33 #include "tensorflow/core/lib/strings/str_util.h"
34 #include "tensorflow/core/util/device_name_utils.h"
35 
36 namespace tensorflow {
37 
DebugString() const38 string NodeItem::DebugString() const {
39   string ret = strings::StrCat("{name:'", kernel->name(), "' id:", node_id);
40   if (is_source) {
41     strings::StrAppend(&ret, " source}");
42   } else {
43     strings::StrAppend(&ret, " def:{", SummarizeNodeDef(kernel->def()), "}}");
44   }
45   return ret;
46 }
47 
~GraphView()48 GraphView::~GraphView() {
49   static_assert(std::is_trivially_destructible<AllocatorAttributes>::value,
50                 "Update code if AllocatorAttributes gains a destructor");
51   static_assert(std::is_trivially_destructible<EdgeInfo>::value,
52                 "Update code if EdgeInfo gains a destructor");
53   for (int i = 0; i < num_nodes_; i++) {
54     NodeItem* n = node(i);
55     if (n != nullptr) {
56       n->NodeItem::~NodeItem();
57       // Memory for "n" itself is held in space_ & gets cleaned up below
58     }
59   }
60   delete[] node_offsets_;
61   delete[] space_;
62 }
63 
64 namespace {
65 typedef std::tuple<int32, int32> OutputAndControlEdges;
66 
CountOutputEdges(const Node * n)67 OutputAndControlEdges CountOutputEdges(const Node* n) {
68   DCHECK_LE(n->out_edges().size(), kint32max);
69   int32 num_output_edges = 0;
70   int32 num_output_control_edges = 0;
71   for (auto e : n->out_edges()) {
72     if (IsSink(e->dst())) continue;
73     if (e->IsControlEdge()) {
74       ++num_output_control_edges;
75     } else {
76       ++num_output_edges;
77     }
78   }
79   return OutputAndControlEdges(num_output_edges, num_output_control_edges);
80 }
81 }  // namespace
82 
NodeItemBytes(const Node * n)83 size_t GraphView::NodeItemBytes(const Node* n) {
84   int32 num_output_edges;
85   int32 num_output_control_edges;
86   std::tie(num_output_edges, num_output_control_edges) = CountOutputEdges(n);
87   const int num_inputs = n->num_inputs();
88   const int num_outputs = n->num_outputs();
89 
90   // Compute number of bytes needed for NodeItem and variable length data.
91   // We do not subtract sizeof(var) since num_inputs/num_outputs might
92   // both be zero.
93   const size_t raw_bytes =
94       sizeof(NodeItem)                             // Fixed
95       + num_output_edges * sizeof(EdgeInfo)        // output_edges[...]
96       + num_output_control_edges *                 //
97             sizeof(ControlEdgeInfo)                // output_control_edges[...]
98       + num_outputs * sizeof(AllocatorAttributes)  // output_attr[...]
99       + num_outputs * sizeof(int)                  // forward_from[num_outputs]
100       + num_inputs * sizeof(uint8)                 // input_type[num_inputs]
101       + num_outputs * sizeof(uint8);               // output_type[num_outputs]
102   static constexpr size_t kItemAlignment = sizeof(NodeItem*);
103   static_assert(kItemAlignment % alignof(NodeItem) == 0,
104                 "NodeItem must be aligned with kItemAlignment");
105   static_assert(kItemAlignment % alignof(EdgeInfo) == 0,
106                 "EdgeInfo must be aligned with kItemAlignment");
107   static_assert(kItemAlignment % alignof(ControlEdgeInfo) == 0,
108                 "ControlEdgeInfo must be aligned with kItemAlignment");
109   static_assert(kItemAlignment % alignof(AllocatorAttributes) == 0,
110                 "AllocatorAttributes must be aligned with kItemAlignment");
111   static_assert(sizeof(NodeItem) % alignof(EdgeInfo) == 0,
112                 "NodeItem must be aligned with EdgeInfo");
113   static_assert(sizeof(NodeItem) % alignof(AllocatorAttributes) == 0,
114                 "NodeItem must be aligned with AllocatorAttributes");
115   static_assert(sizeof(EdgeInfo) % alignof(AllocatorAttributes) == 0,
116                 "EdgeInfo must be aligned with AllocatorAttributes");
117   const size_t bytes =
118       ((raw_bytes + kItemAlignment - 1) / kItemAlignment) * kItemAlignment;
119   return bytes;
120 }
121 
InitializeNode(char * ptr,const Node * n)122 char* GraphView::InitializeNode(char* ptr, const Node* n) {
123   const int id = n->id();
124   CHECK(node_offsets_[id] == kuint32max);  // Initial value in constructor
125 
126   const size_t bytes = NodeItemBytes(n);
127   constexpr size_t kItemAlignment = sizeof(NodeItem*);
128   CHECK_EQ(reinterpret_cast<uintptr_t>(ptr) % kItemAlignment, 0);
129   NodeItem* item = reinterpret_cast<NodeItem*>(ptr);
130 
131   // We store a 32-bit offset relative to the beginning of space_, so that we
132   // only need an array of 32-bit values to map from node id to the NodeItem*,
133   // (versus 64 bits on most machines if we just stored an array of NodeItem*
134   // pointers). Casting to int64 is needed on 32bit CPU to avoid comparing
135   // values as "int" vs "size_t" in CHECK_LE.
136   CHECK_LE(static_cast<int64>(ptr - space_), kuint32max);
137   const uint32 offset = static_cast<uint32>(ptr - space_);
138   node_offsets_[id] = offset;
139   ptr += bytes;
140 
141   int32 num_output_edges;
142   int32 num_output_control_edges;
143   std::tie(num_output_edges, num_output_control_edges) = CountOutputEdges(n);
144   const int num_inputs = n->num_inputs();
145   const int num_outputs = n->num_outputs();
146 
147   new (item) NodeItem();
148   item->num_inputs = num_inputs;
149   item->num_outputs = num_outputs;
150   item->num_output_edges = num_output_edges;
151   item->num_output_control_edges = num_output_control_edges;
152 
153   // Fill output edges.
154   // Keep track of the last EdgeInfo in the EdgeInfo array that references
155   // a given output slot.  For all but the last, we need to do a copy of the
156   // Tensor when propagating results downstream in the graph, but for the
157   // last one, we can just do a move of the Tensor object to propagate it.
158   gtl::InlinedVector<EdgeInfo*, 4> last_indices(num_outputs, nullptr);
159   EdgeInfo* dst_edge = item->output_edge_base();
160   for (auto e : n->out_edges()) {
161     if (e->IsControlEdge()) continue;
162     dst_edge->dst_id = e->dst()->id();
163     CHECK_LE(e->src_output(), 0x3FFFFFFF);  // Must fit in 31 bits
164     dst_edge->output_slot = e->src_output();
165     dst_edge->is_last = false;
166     const int output_slot = dst_edge->output_slot;
167     if (output_slot >= 0) {
168       last_indices[output_slot] = dst_edge;
169     }
170     // NOTE: The `input_slot` will be rewritten to the frame-wide offset later
171     // in `ExecutorImpl::Initialize()`.
172     dst_edge->input_slot = e->dst_input();
173     dst_edge++;
174   }
175   for (EdgeInfo* edge_info : last_indices) {
176     if (edge_info != nullptr) {
177       edge_info->is_last = true;
178     }
179   }
180   ControlEdgeInfo* dst_control_edge = item->output_control_edge_base();
181   for (auto e : n->out_edges()) {
182     if (!e->IsControlEdge() || IsSink(e->dst())) continue;
183     dst_control_edge->dst_id = e->dst()->id();
184     dst_control_edge++;
185   }
186 
187   AllocatorAttributes* output_attrs = item->output_attr_base();
188   for (int i = 0; i < num_outputs; i++) {
189     new (&output_attrs[i]) AllocatorAttributes();
190   }
191 
192   DCHECK_LT(DataType_MAX, 255);  // Must fit in uint8
193   uint8* input_types = item->input_type_base();
194   item->is_any_input_ref_typed = false;
195   for (int i = 0; i < num_inputs; i++) {
196     input_types[i] = static_cast<uint8>(n->input_type(i));
197     DCHECK_EQ(item->input_type(i), n->input_type(i));
198     item->is_any_input_ref_typed |= IsRefType(n->input_type(i));
199   }
200 
201   // Check ScopedAllocatorAttrs and forward_from.  Also assign output_types.
202   {
203     std::vector<int> forward_input;
204     Status fwd_status =
205         GetNodeAttr(n->attrs(), "_forward_input", &forward_input);
206     std::vector<int> scoped_allocator_attrs;
207     Status sa_status =
208         GetNodeAttr(n->attrs(), "_scoped_allocator", &scoped_allocator_attrs);
209 
210     int* forward_from = item->forward_from_base();
211     uint8* output_types = item->output_type_base();
212     for (int i = 0; i < num_outputs; ++i) {
213       output_types[i] = static_cast<uint8>(n->output_type(i));
214       DCHECK_EQ(item->output_type(i), n->output_type(i));
215 
216       forward_from[i] = OpKernelContext::Params::kNoReservation;
217       if (sa_status.ok()) {
218         for (int j = 0; j < scoped_allocator_attrs.size(); j += 2) {
219           if (scoped_allocator_attrs[j] == i) {
220             // This output slot must be explicitly allocated from a
221             // ScopedAllocator.
222             forward_from[i] = OpKernelContext::Params::kNeverForward;
223             DCHECK_EQ(output_attrs[i].scope_id, 0);
224             output_attrs[i].scope_id = scoped_allocator_attrs[j + 1];
225           }
226         }
227       }
228       if (fwd_status.ok() &&
229           forward_from[i] == OpKernelContext::Params::kNoReservation) {
230         DCHECK_EQ(forward_input.size() % 2, 0);
231         for (int j = 0; j < forward_input.size(); j += 2) {
232           if (forward_input[j + 1] == i) {
233             DCHECK_EQ(forward_from[i], OpKernelContext::Params::kNoReservation);
234             forward_from[i] = forward_input[j];
235             break;
236           }
237         }
238       }
239     }
240   }
241 
242   return ptr;
243 }
244 
Initialize(const Graph * g)245 Status GraphView::Initialize(const Graph* g) {
246   CHECK(node_offsets_ == nullptr);
247   const int num_nodes = g->num_node_ids();
248   num_nodes_ = num_nodes;
249   size_t total_bytes = 0;
250   for (const Node* n : g->nodes()) {
251     if (n->out_edges().size() > kint32max) {
252       return errors::InvalidArgument(
253           "The executor cannot handle nodes with more than ", kint32max,
254           " output edges. Node ", n->name(), " had ", n->out_edges().size(),
255           " output edges.");
256     }
257     total_bytes += NodeItemBytes(n);
258   }
259 
260   node_offsets_ = new uint32[num_nodes];
261   for (int i = 0; i < num_nodes; i++) {
262     node_offsets_[i] = kuint32max;
263   }
264 
265   space_ = new char[total_bytes];  // NodeItem objects are allocated here
266   char* ptr = space_;
267   for (const Node* n : g->nodes()) {
268     ptr = InitializeNode(ptr, n);
269   }
270   CHECK_EQ(ptr, space_ + total_bytes);
271   return Status::OK();
272 }
273 
274 namespace {
275 // If a Node has been marked to use a ScopedAllocator x for output i, then
276 // sc_attr will contain the subsequence (i, x) at an even offset.  This function
277 // extracts and transfers that ScopedAllocator id to alloc_attr.  For now, we
278 // only allow one ScopedAllocator use per Node.
ExtractScopedAllocatorAttr(const std::vector<int> & sc_attr,int output_index,AllocatorAttributes * alloc_attr)279 bool ExtractScopedAllocatorAttr(const std::vector<int>& sc_attr,
280                                 int output_index,
281                                 AllocatorAttributes* alloc_attr) {
282   DCHECK_LE(2, sc_attr.size());
283   for (int i = 0; i < sc_attr.size(); i += 2) {
284     if (sc_attr[i] == output_index) {
285       CHECK_EQ(alloc_attr->scope_id, 0);
286       alloc_attr->scope_id = sc_attr[i + 1];
287       return true;
288     }
289   }
290   return false;
291 }
292 }  // namespace
293 
SetScopedAllocatorAttrs(const std::vector<const Node * > & sa_nodes)294 void GraphView::SetScopedAllocatorAttrs(
295     const std::vector<const Node*>& sa_nodes) {
296   for (const Node* sa : sa_nodes) {
297     NodeItem* sa_item = node(sa->id());
298     AllocatorAttributes* sa_attrs = sa_item->output_attr_base();
299     // Control edges out of the ScopedAllocator should be use instances, but may
300     // include a few other nodes.
301     for (const auto& e : sa->out_edges()) {
302       if (IsSink(e->dst()) || !e->IsControlEdge()) {
303         continue;
304       }
305       Node* use_node = e->dst();
306       NodeItem* item = node(use_node->id());
307       AllocatorAttributes* use_attrs = item->output_attr_base();
308       std::vector<int> scoped_allocator_attrs;
309       Status s = GetNodeAttr(use_node->attrs(), "_scoped_allocator",
310                              &scoped_allocator_attrs);
311       if (!s.ok()) {
312         VLOG(2) << "Failed to find expected ScopedAllocator attr on "
313                 << use_node->name();
314         continue;
315       }
316       // There can be more than one output using ScopedAllocation, but this
317       // analysis assumes they use the same ScopedAllocator.
318       for (const auto& e : use_node->out_edges()) {
319         if (IsSink(e->dst()) || !e->IsControlEdge()) {
320           AllocatorAttributes attr;
321           if (ExtractScopedAllocatorAttr(scoped_allocator_attrs,
322                                          e->src_output(), &attr)) {
323             // Set the scope_id on this use instance node.
324             (use_attrs + e->src_output())->Merge(attr);
325             // Propagate the other attributes of this node back to the SA node.
326             attr = *(use_attrs + e->src_output());
327             attr.scope_id = 0;
328             sa_attrs->Merge(attr);
329           }
330         }
331       }
332     }
333   }
334 }
335 
336 namespace {
InferAllocAttr(const Node * n,const Node * dst,const DeviceNameUtils::ParsedName & local_dev_name,AllocatorAttributes * attr)337 Status InferAllocAttr(const Node* n, const Node* dst,
338                       const DeviceNameUtils::ParsedName& local_dev_name,
339                       AllocatorAttributes* attr) {
340   Status s;
341   // Note that it's possible for *n to be a Recv and *dst to be a Send,
342   // so these two cases are not mutually exclusive.
343   if (IsRecv(n)) {
344     string src_name;
345     s = GetNodeAttr(n->attrs(), "send_device", &src_name);
346     if (!s.ok()) return s;
347     DeviceNameUtils::ParsedName parsed_src_name;
348     if (!DeviceNameUtils::ParseFullName(src_name, &parsed_src_name)) {
349       s = errors::Internal("Bad send_device attr '", src_name, "' in node ",
350                            n->name());
351       return s;
352     }
353     if (!DeviceNameUtils::IsSameAddressSpace(parsed_src_name, local_dev_name)) {
354       // Value is going to be the sink of an RPC.
355       attr->set_nic_compatible(true);
356       VLOG(2) << "node " << n->name() << " is the sink of an RPC in";
357     } else if ((local_dev_name.type == "CPU" || n->IsHostRecv()) &&
358                parsed_src_name.type != "CPU") {
359       // Value is going to be the sink of a local DMA from GPU to CPU (or
360       // other types of accelerators).
361       attr->set_gpu_compatible(true);
362       VLOG(2) << "node " << n->name() << " is the sink of a gpu->cpu copy";
363     } else {
364       VLOG(2) << "default alloc case local type " << local_dev_name.type
365               << " remote type " << parsed_src_name.type;
366     }
367   }
368   if (IsSend(dst)) {
369     string dst_name;
370     s = GetNodeAttr(dst->attrs(), "recv_device", &dst_name);
371     if (!s.ok()) return s;
372     DeviceNameUtils::ParsedName parsed_dst_name;
373     if (!DeviceNameUtils::ParseFullName(dst_name, &parsed_dst_name)) {
374       s = errors::Internal("Bad recv_device attr '", dst_name, "' in node ",
375                            n->name());
376       return s;
377     }
378     if (!DeviceNameUtils::IsSameAddressSpace(parsed_dst_name, local_dev_name)) {
379       // Value is going to be the source of an RPC.
380       attr->set_nic_compatible(true);
381       VLOG(2) << "node " << n->name() << " is the source of an RPC out";
382     } else if ((local_dev_name.type == "CPU" || dst->IsHostSend()) &&
383                parsed_dst_name.type != "CPU") {
384       // Value is going to be the source of a local DMA from CPU to GPU (or
385       // other types of accelerators).
386       // Note that this does not cover the case where the allocation of the
387       // output tensor is not generated by the src: n.
388       attr->set_gpu_compatible(true);
389       VLOG(2) << "node " << n->name() << " is the source of a cpu->gpu copy";
390     } else {
391       VLOG(2) << "default alloc case local type " << local_dev_name.type
392               << " remote type " << parsed_dst_name.type;
393     }
394   }
395   if (n->IsCollective()) {
396     // We'll make the sweeping assumption that any collective op is going
397     // to be involved in network i/o.
398     attr->set_nic_compatible(true);
399   }
400   return s;
401 }
402 }  // namespace
403 
SetAllocAttrs(const Graph * g,const Device * device)404 Status GraphView::SetAllocAttrs(const Graph* g, const Device* device) {
405   Status s;
406   DeviceNameUtils::ParsedName local_dev_name = device->parsed_name();
407 
408   std::vector<const Node*> scoped_allocator_instances;
409   for (const Node* n : g->nodes()) {
410     NodeItem* item = node(n->id());
411     AllocatorAttributes* attrs = item->output_attr_base();
412     if (IsScopedAllocator(n)) {
413       scoped_allocator_instances.push_back(n);
414     }
415 
416     // Examine the out edges of each node looking for special use
417     // cases that may affect memory allocation attributes.
418     for (const auto& e : n->out_edges()) {
419       if (!e->IsControlEdge()) {
420         AllocatorAttributes attr;
421         s = InferAllocAttr(n, e->dst(), local_dev_name, &attr);
422         if (!s.ok()) return s;
423         if (attr.value != 0 || attr.scope_id != 0) {
424           attrs[e->src_output()].Merge(attr);
425         }
426       }
427     }
428 
429     for (int out = 0; out < n->num_outputs(); out++) {
430       const OpKernel* op_kernel = item->kernel;
431       DCHECK_LT(out, op_kernel->output_memory_types().size());
432       bool on_host = op_kernel->output_memory_types()[out] == HOST_MEMORY;
433       if (on_host) {
434         AllocatorAttributes h;
435         h.set_on_host(on_host);
436         attrs[out].Merge(h);
437       }
438     }
439   }
440   SetScopedAllocatorAttrs(scoped_allocator_instances);
441   return s;
442 }
443 
444 }  // namespace tensorflow
445