1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/framework/rendezvous.h"
17 
18 #include <deque>
19 #include <functional>
20 #include <utility>
21 #include <vector>
22 
23 #include "tensorflow/core/framework/local_rendezvous.h"
24 #include "tensorflow/core/lib/core/errors.h"
25 #include "tensorflow/core/lib/core/notification.h"
26 #include "tensorflow/core/lib/gtl/flatmap.h"
27 #include "tensorflow/core/lib/gtl/manual_constructor.h"
28 #include "tensorflow/core/lib/hash/hash.h"
29 #include "tensorflow/core/lib/strings/str_util.h"
30 #include "tensorflow/core/platform/logging.h"
31 #include "tensorflow/core/platform/macros.h"
32 #include "tensorflow/core/platform/mutex.h"
33 #include "tensorflow/core/platform/thread_annotations.h"
34 #include "tensorflow/core/platform/types.h"
35 
36 namespace tensorflow {
37 
operator =(const ParsedKey & b)38 Rendezvous::ParsedKey& Rendezvous::ParsedKey::operator=(const ParsedKey& b) {
39   const char* b_base = b.buf_.data();
40   buf_ = b.buf_;
41   src_device = StringPiece(buf_.data() + (b.src_device.data() - b_base),
42                            b.src_device.size());
43   src = b.src;
44   src_incarnation = b.src_incarnation;
45   dst_device = StringPiece(buf_.data() + (b.dst_device.data() - b_base),
46                            b.dst_device.size());
47   dst = b.dst;
48   edge_name = StringPiece(buf_.data() + (b.edge_name.data() - b_base),
49                           b.edge_name.size());
50   return *this;
51 }
52 
53 /*  static */
CreateKey(const string & src_device,uint64 src_incarnation,const string & dst_device,const string & name,const FrameAndIter & frame_iter)54 string Rendezvous::CreateKey(const string& src_device, uint64 src_incarnation,
55                              const string& dst_device, const string& name,
56                              const FrameAndIter& frame_iter) {
57   // NOTE: ';' is not used in the device name's job name.
58   //
59   // We include both sender and receiver in the key to facilitate
60   // debugging. For correctness, we only need to encode the receiver.
61   //
62   // "src_incarnation" is used to distinguish a worker when it
63   // restarts.
64   char buf[strings::kFastToBufferSize];
65   return strings::StrCat(
66       src_device, ";", strings::Uint64ToHexString(src_incarnation, buf), ";",
67       dst_device, ";", name, ";", frame_iter.frame_id, ":", frame_iter.iter_id);
68 }
69 
70 // Return the prefix of "*s" up to the next occurrence of "delim", or
71 // the whole remaining string if "delim" is not found.  "*s" is advanced
72 // past the string returned plus the delimiter (if found).
ConsumeNextPart(StringPiece * s,char delim)73 static StringPiece ConsumeNextPart(StringPiece* s, char delim) {
74   for (size_t offset = 0; offset < s->size(); offset++) {
75     if ((*s)[offset] == delim) {
76       StringPiece result(s->data(), offset);
77       s->remove_prefix(offset + 1);  // +1: remove delim, as well
78       return result;
79     }
80   }
81   // No delimiter found: return rest of string
82   StringPiece result(s->data(), s->size());
83   s->remove_prefix(s->size());
84   return result;
85 }
86 
87 /* static */
ParseKey(StringPiece key,ParsedKey * out)88 Status Rendezvous::ParseKey(StringPiece key, ParsedKey* out) {
89   if (key.data() == out->buf_.data()) {
90     // Caller used our buf_ string directly, so we don't need to copy.  (The
91     // SendOp and RecvOp implementations do this, for example).
92     DCHECK_EQ(key.size(), out->buf_.size());
93   } else {
94     // Make a copy that our StringPieces can point at a copy that will persist
95     // for the lifetime of the ParsedKey object.
96     out->buf_.assign(key.data(), key.size());
97   }
98   StringPiece s(out->buf_);
99   StringPiece parts[5];
100   for (int i = 0; i < 5; i++) {
101     parts[i] = ConsumeNextPart(&s, ';');
102   }
103   if (s.empty() &&          // Consumed the whole string
104       !parts[4].empty() &&  // Exactly five parts
105       DeviceNameUtils::ParseFullName(parts[0], &out->src) &&
106       strings::HexStringToUint64(parts[1], &out->src_incarnation) &&
107       DeviceNameUtils::ParseFullName(parts[2], &out->dst) &&
108       !parts[3].empty()) {
109     out->src_device = StringPiece(parts[0].data(), parts[0].size());
110     out->dst_device = StringPiece(parts[2].data(), parts[2].size());
111     out->edge_name = StringPiece(parts[3].data(), parts[3].size());
112     return Status::OK();
113   }
114   return errors::InvalidArgument("Invalid  rendezvous key: ", key);
115 }
116 
~RendezvousInterface()117 RendezvousInterface::~RendezvousInterface() {}
118 
Recv(const ParsedKey & key,const Args & recv_args,Tensor * val,bool * is_dead,int64 timeout_ms)119 Status RendezvousInterface::Recv(const ParsedKey& key, const Args& recv_args,
120                                  Tensor* val, bool* is_dead, int64 timeout_ms) {
121   Status ret;
122   Notification n;
123   RecvAsync(key, recv_args,
124             [&ret, &n, val, is_dead](const Status& s, const Args& send_args,
125                                      const Args& recv_args, const Tensor& v,
126                                      const bool dead) {
127               ret = s;
128               *val = v;
129               *is_dead = dead;
130               n.Notify();
131             });
132   if (timeout_ms > 0) {
133     int64 timeout_us = timeout_ms * 1000;
134     bool notified = WaitForNotificationWithTimeout(&n, timeout_us);
135     if (!notified) {
136       return Status(error::DEADLINE_EXCEEDED,
137                     "Timed out waiting for notification");
138     }
139   } else {
140     n.WaitForNotification();
141   }
142   return ret;
143 }
144 
Recv(const ParsedKey & key,const Args & args,Tensor * val,bool * is_dead)145 Status RendezvousInterface::Recv(const ParsedKey& key, const Args& args,
146                                  Tensor* val, bool* is_dead) {
147   const int64 no_timeout = 0;
148   return Recv(key, args, val, is_dead, no_timeout);
149 }
150 
151 namespace {
152 class LocalRendezvousWrapper : public Rendezvous {
153  public:
LocalRendezvousWrapper()154   LocalRendezvousWrapper() : impl_(this) {}
155 
Send(const ParsedKey & key,const Args & send_args,const Tensor & val,const bool is_dead)156   Status Send(const ParsedKey& key, const Args& send_args, const Tensor& val,
157               const bool is_dead) override {
158     return impl_.Send(key, send_args, val, is_dead);
159   }
160 
RecvAsync(const ParsedKey & key,const Args & recv_args,DoneCallback done)161   void RecvAsync(const ParsedKey& key, const Args& recv_args,
162                  DoneCallback done) override {
163     impl_.RecvAsync(key, recv_args, std::move(done));
164   }
165 
StartAbort(const Status & status)166   void StartAbort(const Status& status) override { impl_.StartAbort(status); }
167 
168  private:
169   LocalRendezvous impl_;
170 
171   TF_DISALLOW_COPY_AND_ASSIGN(LocalRendezvousWrapper);
172 };
173 }  // namespace
174 
NewLocalRendezvous()175 Rendezvous* NewLocalRendezvous() { return new LocalRendezvousWrapper; }
176 
177 }  // end namespace tensorflow
178