1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/platform/status.h"
17 
18 #include <stdio.h>
19 
20 #include <deque>
21 #include <map>
22 
23 #include "absl/base/call_once.h"
24 #include "tensorflow/core/platform/mutex.h"
25 #include "tensorflow/core/platform/stacktrace.h"
26 #include "tensorflow/core/platform/str_util.h"
27 #include "tensorflow/core/platform/strcat.h"
28 #include "tensorflow/core/platform/stringprintf.h"
29 #include "tensorflow/core/protobuf/error_codes.pb.h"
30 
31 namespace tensorflow {
32 
33 namespace {
34 
35 // Log sink is used to collect recent warning and error log messages to be
36 // attached to the error status.
37 class StatusLogSink : public TFLogSink {
38  public:
GetInstance()39   static StatusLogSink* GetInstance() {
40     static StatusLogSink* sink = new StatusLogSink();
41     return sink;
42   }
43 
enable()44   void enable() {
45     absl::call_once(flag_, [this] {
46       num_messages_ = 5;  // default to 5 messages
47 
48       if (const char* num_msgs_str =
49               getenv("TF_WORKER_NUM_FORWARDED_LOG_MESSAGES")) {
50         if (!absl::SimpleAtoi(num_msgs_str, &num_messages_)) {
51           LOG(WARNING) << "Failed to parse env variable "
52                           "TF_WORKER_NUM_WARNING_ERROR_LOG_IN_STATUS="
53                        << num_msgs_str << " as int. Using the default value "
54                        << num_messages_ << ".";
55         }
56       }
57 
58       if (num_messages_ > 0) {
59         TFAddLogSink(this);
60       }
61     });
62   }
63 
GetMessages(std::vector<std::string> * logs)64   void GetMessages(std::vector<std::string>* logs) TF_LOCKS_EXCLUDED(mu_) {
65     mutex_lock lock(mu_);
66 
67     for (auto& msg : messages_) {
68       logs->push_back(msg);
69     }
70   }
71 
Send(const TFLogEntry & entry)72   void Send(const TFLogEntry& entry) override TF_LOCKS_EXCLUDED(mu_) {
73     if (entry.log_severity() < absl::LogSeverity::kWarning) return;
74 
75     mutex_lock lock(mu_);
76     messages_.emplace_back(entry.ToString());
77     if (messages_.size() > static_cast<size_t>(num_messages_)) {
78       messages_.pop_front();
79     }
80   }
81 
82  private:
83   mutex mu_;
84   // for allowing repeated/concurrent calls to enable()
85   absl::once_flag flag_;
86   int num_messages_ = 0;
87   std::deque<std::string> messages_ TF_GUARDED_BY(mu_);
88 };
89 
90 }  // namespace
91 
Status(tensorflow::error::Code code,tensorflow::StringPiece msg,std::vector<StackFrame> && stack_trace)92 Status::Status(tensorflow::error::Code code, tensorflow::StringPiece msg,
93                std::vector<StackFrame>&& stack_trace) {
94   assert(code != tensorflow::error::OK);
95   state_ = std::unique_ptr<State>(new State);
96   state_->code = code;
97   state_->msg = string(msg);
98   state_->stack_trace = std::move(stack_trace);
99   VLOG(5) << "Generated non-OK status: \"" << *this << "\". "
100           << CurrentStackTrace();
101 }
102 
Update(const Status & new_status)103 void Status::Update(const Status& new_status) {
104   if (ok()) {
105     *this = new_status;
106   }
107 }
108 
SlowCopyFrom(const State * src)109 void Status::SlowCopyFrom(const State* src) {
110   if (src == nullptr) {
111     state_ = nullptr;
112   } else {
113     state_ = std::unique_ptr<State>(new State(*src));
114   }
115 }
116 
empty_string()117 const string& Status::empty_string() {
118   static string* empty = new string;
119   return *empty;
120 }
121 
empty_stack_trace()122 const std::vector<StackFrame>& Status::empty_stack_trace() {
123   static std::vector<StackFrame>* empty = new std::vector<StackFrame>();
124   return *empty;
125 }
126 
error_name(error::Code code)127 string error_name(error::Code code) {
128   switch (code) {
129     case tensorflow::error::OK:
130       return "OK";
131       break;
132     case tensorflow::error::CANCELLED:
133       return "Cancelled";
134       break;
135     case tensorflow::error::UNKNOWN:
136       return "Unknown";
137       break;
138     case tensorflow::error::INVALID_ARGUMENT:
139       return "Invalid argument";
140       break;
141     case tensorflow::error::DEADLINE_EXCEEDED:
142       return "Deadline exceeded";
143       break;
144     case tensorflow::error::NOT_FOUND:
145       return "Not found";
146       break;
147     case tensorflow::error::ALREADY_EXISTS:
148       return "Already exists";
149       break;
150     case tensorflow::error::PERMISSION_DENIED:
151       return "Permission denied";
152       break;
153     case tensorflow::error::UNAUTHENTICATED:
154       return "Unauthenticated";
155       break;
156     case tensorflow::error::RESOURCE_EXHAUSTED:
157       return "Resource exhausted";
158       break;
159     case tensorflow::error::FAILED_PRECONDITION:
160       return "Failed precondition";
161       break;
162     case tensorflow::error::ABORTED:
163       return "Aborted";
164       break;
165     case tensorflow::error::OUT_OF_RANGE:
166       return "Out of range";
167       break;
168     case tensorflow::error::UNIMPLEMENTED:
169       return "Unimplemented";
170       break;
171     case tensorflow::error::INTERNAL:
172       return "Internal";
173       break;
174     case tensorflow::error::UNAVAILABLE:
175       return "Unavailable";
176       break;
177     case tensorflow::error::DATA_LOSS:
178       return "Data loss";
179       break;
180     default:
181       char tmp[30];
182       snprintf(tmp, sizeof(tmp), "Unknown code(%d)", static_cast<int>(code));
183       return tmp;
184       break;
185   }
186 }
187 
ToString() const188 string Status::ToString() const {
189   if (state_ == nullptr) {
190     return "OK";
191   } else {
192     string result(error_name(code()));
193     result += ": ";
194     result += state_->msg;
195     return result;
196   }
197 }
198 
IgnoreError() const199 void Status::IgnoreError() const {
200   // no-op
201 }
202 
SetPayload(tensorflow::StringPiece type_url,tensorflow::StringPiece payload)203 void Status::SetPayload(tensorflow::StringPiece type_url,
204                         tensorflow::StringPiece payload) {
205   if (ok()) return;
206   state_->payloads[std::string(type_url)] = std::string(payload);
207 }
208 
GetPayload(tensorflow::StringPiece type_url) const209 tensorflow::StringPiece Status::GetPayload(
210     tensorflow::StringPiece type_url) const {
211   if (ok()) return tensorflow::StringPiece();
212   auto payload_iter = state_->payloads.find(std::string(type_url));
213   if (payload_iter == state_->payloads.end()) return tensorflow::StringPiece();
214   return tensorflow::StringPiece(payload_iter->second);
215 }
216 
ErasePayload(tensorflow::StringPiece type_url)217 bool Status::ErasePayload(tensorflow::StringPiece type_url) {
218   if (ok()) return false;
219   auto payload_iter = state_->payloads.find(std::string(type_url));
220   if (payload_iter == state_->payloads.end()) return false;
221   state_->payloads.erase(payload_iter);
222   return true;
223 }
224 
operator <<(std::ostream & os,const Status & x)225 std::ostream& operator<<(std::ostream& os, const Status& x) {
226   os << x.ToString();
227   return os;
228 }
229 
TfCheckOpHelperOutOfLine(const::tensorflow::Status & v,const char * msg)230 string* TfCheckOpHelperOutOfLine(const ::tensorflow::Status& v,
231                                  const char* msg) {
232   string r("Non-OK-status: ");
233   r += msg;
234   r += " status: ";
235   r += v.ToString();
236   // Leaks string but this is only to be used in a fatal error message
237   return new string(r);
238 }
239 
240 // kDerivedMarker is appended to the Status message string to indicate whether a
241 // Status object is the root cause of an error or if it has been triggered by
242 // cancelling/aborting a step.
243 static const char* kDerivedMarker = "[_Derived_]";
244 
MakeDerived(const Status & s)245 Status StatusGroup::MakeDerived(const Status& s) {
246   if (IsDerived(s)) {
247     return s;
248   } else {
249     return Status(s.code(), strings::StrCat(kDerivedMarker, s.error_message()));
250   }
251 }
252 
IsDerived(const Status & s)253 bool StatusGroup::IsDerived(const Status& s) {
254   return s.error_message().find(kDerivedMarker) != std::string::npos;
255 }
256 
ConfigureLogHistory()257 void StatusGroup::ConfigureLogHistory() {
258   StatusLogSink::GetInstance()->enable();
259 }
260 
Update(const Status & s)261 void StatusGroup::Update(const Status& s) {
262   if (s.ok()) {
263     ++num_ok_;
264   } else {
265     ok_ = false;
266     children_.push_back(s);
267   }
268 }
269 
GetNonDerivedStatuses(const std::vector<Status> & status)270 static std::vector<Status> GetNonDerivedStatuses(
271     const std::vector<Status>& status) {
272   std::vector<Status> nonderived_statuses;
273   for (auto& s : status) {
274     if (!StatusGroup::IsDerived(s)) {
275       nonderived_statuses.push_back(s);
276     }
277   }
278   return nonderived_statuses;
279 }
280 
281 static constexpr int kMaxAggregatedStatusMessageSize = 8 * 1024;
282 static constexpr int kMaxAttachedLogMessageSize = 512;
283 
284 // Summarize all the status objects in the StatusGroup. This is used when
285 // individual Status objects in the StatusGroup are not already summarized.
as_summary_status() const286 Status StatusGroup::as_summary_status() const {
287   if (ok_) {
288     return Status::OK();
289   }
290 
291   // Gather recent logs as a string
292   auto get_recent_logs = [this]() -> std::string {
293     if (!recent_logs_.empty()) {
294       std::vector<std::string> fmt;
295       fmt.push_back("\nRecent warning and error logs:");
296       for (auto& log : recent_logs_) {
297         // Add an indentation to make it look nicer.
298         fmt.push_back("  " + log.substr(0, kMaxAttachedLogMessageSize));
299       }
300       return absl::StrJoin(fmt, "\n");
301     } else {
302       return "";
303     }
304   };
305 
306   std::vector<Status> nonderived_statuses = GetNonDerivedStatuses(children_);
307 
308   // If only one root status is found, do not add summary header and footer.
309   if (nonderived_statuses.size() == 1) {
310     return Status(nonderived_statuses[0].code(),
311                   strings::StrCat(nonderived_statuses[0].error_message(),
312                                   get_recent_logs()));
313   }
314 
315   if (!nonderived_statuses.empty()) {
316     std::vector<std::string> fmt;
317 
318     fmt.push_back(strings::Printf("%zu root error(s) found.",
319                                   nonderived_statuses.size()));
320 
321     int index = 0;
322     auto code = tensorflow::error::CANCELLED;
323     for (auto& s : nonderived_statuses) {
324       // NOTE: Avoid using CANCELLED as the code of summary status if the group
325       // contains other error code.
326       if (code == tensorflow::error::CANCELLED &&
327           s.code() != tensorflow::error::CANCELLED) {
328         code = s.code();
329       }
330       fmt.emplace_back(strings::StrCat("  (", index, ") ", s.ToString()));
331       ++index;
332     }
333 
334     fmt.push_back(strings::Printf("%zu successful operations.", num_ok_));
335     fmt.push_back(
336         strings::Printf("%zu derived errors ignored.",
337                         children_.size() - nonderived_statuses.size()));
338 
339     std::string error_msg =
340         absl::StrJoin(fmt, "\n").substr(0, kMaxAggregatedStatusMessageSize);
341 
342     return Status(code, strings::StrCat(error_msg, get_recent_logs()));
343   } else {
344     // All statuses are derived. Pick the first available status to return.
345     return children_[0];
346   }
347 }
348 
349 // Concatenate all the status objects in the StatusGroup. This is used when
350 // individual Status objects in the StatusGroup are already summarized Status.
as_concatenated_status() const351 Status StatusGroup::as_concatenated_status() const {
352   if (ok_) {
353     return Status::OK();
354   }
355 
356   std::vector<Status> nonderived_statuses = GetNonDerivedStatuses(children_);
357 
358   // If only one root status is found, return it directly.
359   if (nonderived_statuses.size() == 1) {
360     return nonderived_statuses[0];
361   }
362 
363   if (!nonderived_statuses.empty()) {
364     std::vector<string> fmt;
365     fmt.emplace_back("\n=====================");
366     for (auto& s : nonderived_statuses) {
367       fmt.emplace_back(s.ToString());
368     }
369     fmt.emplace_back("=====================\n");
370     return Status(
371         nonderived_statuses[0].code(),
372         absl::StrJoin(fmt, "\n").substr(0, kMaxAggregatedStatusMessageSize));
373   } else {
374     // All statuses are derived. Pick the first available status to return.
375     // This should not happen in normal execution.
376     return children_[0];
377   }
378 }
379 
AttachLogMessages()380 void StatusGroup::AttachLogMessages() {
381   recent_logs_.clear();
382   StatusLogSink::GetInstance()->GetMessages(&recent_logs_);
383 }
384 
385 }  // namespace tensorflow
386