1 /* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 #include "tensorflow/core/common_runtime/placer_inspection_required_ops_utils.h"
16
17 #include <unordered_map>
18 #include <unordered_set>
19
20 #include "absl/strings/str_cat.h"
21 #include "absl/types/optional.h"
22 #include "tensorflow/core/framework/function.h"
23 #include "tensorflow/core/framework/node_def_builder.h"
24 #include "tensorflow/core/graph/graph.h"
25 #include "tensorflow/core/lib/core/errors.h"
26 #include "tensorflow/core/lib/core/status.h"
27
28 namespace tensorflow {
29 namespace {
30
IsFunctionCall(const Node & node)31 bool IsFunctionCall(const Node& node) {
32 // TODO(iga): Handle non-PCO functions when we add multi-device support
33 // to regular function calls. Also, the GetFunctionDefAndAttrs assumes that
34 // the function name is stored in the `f` attribute of the node. That code
35 // will need to change as well.
36 const string& op_type = node.op_def().name();
37 return op_type == "PartitionedCall" || op_type == "StatefulPartitionedCall";
38 }
39
40 // Utility to set node's value in `cache` and `is_deep` to `value`.
Set(const Node & node,bool value,bool * is_deep,std::vector<absl::optional<bool>> * cache)41 Status Set(const Node& node, bool value, bool* is_deep,
42 std::vector<absl::optional<bool>>* cache) {
43 *is_deep = value;
44 (*cache)[node.id()] = value;
45 return Status::OK();
46 }
47
48 } // namespace
49
PlacerInspectionRequiredOpChecker(const Graph * graph)50 PlacerInspectionRequiredOpChecker::PlacerInspectionRequiredOpChecker(
51 const Graph* graph)
52 : PlacerInspectionRequiredOpChecker(graph, &graph->flib_def()) {}
53
PlacerInspectionRequiredOpChecker(const Graph * graph,const FunctionLibraryDefinition * flib_def)54 PlacerInspectionRequiredOpChecker::PlacerInspectionRequiredOpChecker(
55 const Graph* graph, const FunctionLibraryDefinition* flib_def)
56 : graph_(*graph), flib_def_(*flib_def) {
57 cache_.resize(graph_.num_node_ids());
58 }
59
IsPlacerInspectionRequired(const Node & node,bool * is_deep)60 Status PlacerInspectionRequiredOpChecker::IsPlacerInspectionRequired(
61 const Node& node, bool* is_deep) {
62 if (cache_[node.id()].has_value()) {
63 *is_deep = cache_[node.id()].value();
64 return Status::OK();
65 }
66
67 if (!IsFunctionCall(node)) {
68 return Set(node, false, is_deep, &cache_);
69 }
70 const FunctionDef* fdef;
71 NameAttrList func;
72 TF_RETURN_IF_ERROR(GetFunctionDefAndAttrs(flib_def_, node, &fdef, &func));
73 DataTypeVector types;
74 TF_RETURN_IF_ERROR(
75 OutputTypesForNode(AttrSlice(&func.attr()), fdef->signature(), &types));
76 for (DataType type : types) {
77 if (type == DT_RESOURCE) {
78 return Set(node, true, is_deep, &cache_);
79 }
80 }
81 return Set(node, false, is_deep, &cache_);
82 }
83
GetFunctionDefAndAttrs(const FunctionLibraryDefinition & flib_def,const Node & node,const FunctionDef ** fdef,NameAttrList * func)84 Status GetFunctionDefAndAttrs(const FunctionLibraryDefinition& flib_def,
85 const Node& node, const FunctionDef** fdef,
86 NameAttrList* func) {
87 TF_RETURN_IF_ERROR(GetNodeAttr(node.def(), "f", func));
88 const string& function_name = func->name();
89 *fdef = flib_def.Find(function_name);
90 if (*fdef == nullptr) {
91 return errors::InvalidArgument(
92 "Failed to find function \"", function_name,
93 "\" in function library: ", flib_def.ToProto().DebugString());
94 }
95 return Status::OK();
96 }
97
FunctionStack(const string & function_name)98 FunctionStack::FunctionStack(const string& function_name)
99 : current_function_name_(function_name) {}
100
Push(const Node * node_in_current_function,const string & new_current_function) const101 FunctionStack FunctionStack::Push(const Node* node_in_current_function,
102 const string& new_current_function) const {
103 FunctionStack new_stack(new_current_function);
104 new_stack.frames_ = frames_;
105 new_stack.frames_.emplace_back(current_function_name_,
106 node_in_current_function);
107 return new_stack;
108 }
109
HasFunction(const string & function_name) const110 bool FunctionStack::HasFunction(const string& function_name) const {
111 if (current_function_name_ == function_name) {
112 return true;
113 }
114 for (const Frame& frame : frames_) {
115 if (frame.function_name == function_name) {
116 return true;
117 }
118 }
119 return false;
120 }
121
FormatForError() const122 string FunctionStack::FormatForError() const {
123 std::vector<string> msgs;
124 for (int i = 0; i < frames_.size(); ++i) {
125 if (frames_[i].function_name.empty()) {
126 // Empty function body should only happen at the top level, i.e. i = 0.
127 // All internal frames should have valid function names.
128 msgs.push_back(absl::StrCat("Graph contains node ",
129 FormatNodeForError(*frames_[i].node)));
130
131 } else {
132 msgs.push_back(absl::StrCat(
133 "Function ", errors::FormatFunctionForError(frames_[i].function_name),
134 " contains node ", FormatNodeForError(*frames_[i].node)));
135 }
136 const string& fname = (i + 1 < frames_.size())
137 ? frames_[i + 1].function_name
138 : current_function_name_;
139 msgs.push_back(absl::StrCat("Node ", FormatNodeForError(*frames_[i].node),
140 " calls function ",
141 errors::FormatFunctionForError(fname)));
142 }
143 return absl::StrJoin(msgs, "\n ");
144 }
145
146 namespace {
147
148 using OutputEdgeMap = std::vector<std::vector<const Edge*>>;
149
150 constexpr char kIdentityOp[] = "Identity";
151
Uniquify(const string & candidate_name,std::unordered_set<string> * node_names)152 string Uniquify(const string& candidate_name,
153 std::unordered_set<string>* node_names) {
154 if (node_names->find(candidate_name) == node_names->end()) {
155 node_names->insert(candidate_name);
156 return candidate_name;
157 }
158
159 for (int counter = 0;; ++counter) {
160 string candidate = absl::StrCat(candidate_name, "_", counter);
161 if (node_names->find(candidate) == node_names->end()) {
162 node_names->insert(candidate);
163 return candidate;
164 }
165 }
166 }
167
AddInputIdentity(Node * node,int input_idx,Graph * graph,std::unordered_set<string> * node_names)168 Status AddInputIdentity(Node* node, int input_idx, Graph* graph,
169 std::unordered_set<string>* node_names) {
170 const Edge* edge;
171 TF_RETURN_IF_ERROR(node->input_edge(input_idx, &edge));
172
173 string identity_name = Uniquify(
174 absl::StrCat(edge->src()->name(), "_", node->name()), node_names);
175
176 NodeDefBuilder builder(identity_name, kIdentityOp);
177 builder.Attr("T", node->input_type(input_idx));
178 NodeDefBuilder::NodeOut input(edge->src()->name(), edge->src_output(),
179 node->input_type(input_idx));
180 builder.Input(input);
181 NodeDef identity_def;
182 TF_RETURN_IF_ERROR(builder.Finalize(&identity_def));
183 MergeDebugInfo(NodeDebugInfo(*node), &identity_def);
184
185 VLOG(6) << "Adding identity into " << edge->src()->name() << ":"
186 << edge->src_output() << " -> " << edge->dst()->name() << ":"
187 << input_idx << " \n"
188 << identity_def.DebugString();
189
190 Status status;
191 Node* identity_node = graph->AddNode(identity_def, &status);
192 if (!status.ok()) {
193 return status;
194 }
195 graph->AddEdge(edge->src(), edge->src_output(), identity_node, 0);
196
197 // Replace node's `input_idx` input with the new identity's 0'th output
198 TF_RETURN_IF_ERROR(graph->UpdateEdge(identity_node, 0, node, input_idx));
199
200 VLOG(6) << "Successfully inserted identity. Modified node: \n"
201 << node->DebugString();
202 return Status::OK();
203 }
204
205 struct EdgePtrCompare {
operator ()tensorflow::__anon98fe080f0211::EdgePtrCompare206 bool operator()(const Edge* lhs, const Edge* rhs) const {
207 return lhs->id() < rhs->id();
208 }
209 };
210
AddOutputIdentities(Node * node,Graph * graph,std::unordered_set<string> * node_names)211 Status AddOutputIdentities(Node* node, Graph* graph,
212 std::unordered_set<string>* node_names) {
213 auto add_identity = [&](int src_output, const string& identity_name,
214 Node** identity_node) {
215 NodeDefBuilder builder(identity_name, kIdentityOp);
216 builder.Attr("T", node->output_type(src_output));
217 NodeDefBuilder::NodeOut input(node->name(), src_output,
218 node->output_type(src_output));
219 builder.Input(input);
220 NodeDef identity_def;
221 TF_RETURN_IF_ERROR(builder.Finalize(&identity_def));
222 MergeDebugInfo(NodeDebugInfo(*node), &identity_def);
223
224 Status status;
225 *identity_node = graph->AddNode(identity_def, &status);
226 if (!status.ok()) {
227 return status;
228 }
229 graph->AddEdge(node, src_output, *identity_node, 0);
230 return Status::OK();
231 };
232
233 // output_used[i] == true iff `node`'s i'th output is used
234 // in this graph
235 std::vector<bool> output_used(node->num_outputs(), false);
236 // Copy the set of edges since EdgeSet does not allow modifications
237 // to graph edges during iteration.
238 const EdgeSet& out_edges = node->out_edges();
239 std::vector<const Edge*> edge_vector(out_edges.begin(), out_edges.end());
240 std::sort(edge_vector.begin(), edge_vector.end(), EdgePtrCompare());
241 for (const Edge* edge : edge_vector) {
242 if (edge->IsControlEdge()) {
243 continue;
244 }
245 output_used[edge->src_output()] = true;
246
247 Node* dst = edge->dst();
248 int dst_input = edge->dst_input();
249 int src_output = edge->src_output();
250 string identity_name =
251 Uniquify(absl::StrCat(node->name(), "_", dst->name()), node_names);
252 Node* identity_node;
253 TF_RETURN_IF_ERROR(add_identity(src_output, identity_name, &identity_node));
254 VLOG(6) << "Adding identity into " << node->name() << ":" << src_output
255 << " -> " << dst->name() << ":" << dst_input << " \n"
256 << identity_node->DebugString();
257
258 // Make original dst node consume the new identity's output instead of
259 // `node`'s output.
260 TF_RETURN_IF_ERROR(graph->UpdateEdge(identity_node, 0, dst, dst_input));
261 }
262
263 for (int output_idx = 0; output_idx < node->num_outputs(); ++output_idx) {
264 if (output_used[output_idx]) {
265 continue;
266 }
267 // The output is unused in the graph. Just add an identity
268 // consuming it.
269 string identity_name = Uniquify(node->name(), node_names);
270 Node* identity_node;
271 TF_RETURN_IF_ERROR(add_identity(output_idx, identity_name, &identity_node));
272 VLOG(6) << "Added identity into " << node->name() << ":" << output_idx
273 << " -> <no consumer>: \n"
274 << identity_node->DebugString();
275 }
276 return Status::OK();
277 }
278
IsolateNode(Node * node,Graph * graph)279 Status IsolateNode(Node* node, Graph* graph) {
280 // We use `node_names` to make sure we pick unique names.
281 // We don't use graph->NewName() because it produces verbose names and
282 // does not actually ensure that they are unique (it assumes all names
283 // are generated using it, which is not true today).
284 std::unordered_set<string> node_names(graph->num_nodes());
285 for (Node* n : graph->nodes()) {
286 node_names.insert(n->name());
287 }
288
289 for (int i = 0; i < node->num_inputs(); ++i) {
290 TF_RETURN_IF_ERROR(AddInputIdentity(node, i, graph, &node_names));
291 }
292 TF_RETURN_IF_ERROR(AddOutputIdentities(node, graph, &node_names));
293 return Status::OK();
294 }
295
296 } // namespace
297
IsolatePlacerInspectionRequiredOps(const FunctionLibraryDefinition & flib_def,Graph * graph)298 Status IsolatePlacerInspectionRequiredOps(
299 const FunctionLibraryDefinition& flib_def, Graph* graph) {
300 PlacerInspectionRequiredOpChecker checker(graph, &flib_def);
301 // It is OK to add nodes to the graph during iteration.
302 // New nodes will get ids above current ids. The loop
303 // will loop over current nodes only because the op_nodes()
304 // iterator uses node ids to iterate.
305 // Because the new nodes will be higher ids, the caching in
306 // the checker will also work fine as new nodes are added.
307 for (Node* node : graph->op_nodes()) {
308 bool should_be_isolated = false;
309 TF_RETURN_IF_ERROR(
310 checker.IsPlacerInspectionRequired(*node, &should_be_isolated));
311 if (!should_be_isolated) {
312 continue;
313 }
314 TF_RETURN_IF_ERROR(IsolateNode(node, graph));
315 }
316
317 return Status::OK();
318 }
319
320 } // namespace tensorflow
321