1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_FRAMEWORK_RENDEZVOUS_H_
17 #define TENSORFLOW_FRAMEWORK_RENDEZVOUS_H_
18 
19 #include <string>
20 
21 #include "tensorflow/core/framework/control_flow.h"
22 #include "tensorflow/core/framework/device_base.h"
23 #include "tensorflow/core/framework/tensor.h"
24 #include "tensorflow/core/lib/core/refcount.h"
25 #include "tensorflow/core/lib/core/status.h"
26 #include "tensorflow/core/util/device_name_utils.h"
27 
28 namespace tensorflow {
29 
30 // A Rendezvous is an abstraction for passing tensors from producers
31 // to consumers. A rendezvous is a table of channels. Each channel is
32 // keyed by a rendezvous key. The key encodes a pair of <producer,
33 // consumer>, where the producer and the consumer are tensorflow
34 // devices.
35 //
36 // The producer calls the Send() method to send one tensor over one
37 // named channel. The consumer calls the Recv() method to receive one
38 // tensor from a named channel. A sequence of tensors can be passed
39 // from the producer to the consumer.  The consumer receives them in
40 // the order as the producer sends them.
41 //
42 // A consumer may safely request the tensor before or after it has
43 // been produced.  A consumer has the choice of making a blocking call
44 // or providing a callback: in either case, the consumer receives the
45 // Tensor as soon as it is available.  A producer never blocks.
46 class Rendezvous : public core::RefCounted {
47  public:
48   struct Args {
49     DeviceContext* device_context = nullptr;
50     AllocatorAttributes alloc_attrs;
51   };
52 
53   // Constructs a rendezvous key for the tensor of "name" sent from
54   // "src_device" to "dst_device". The tensor is generated in the frame
55   // and iteration specified by "frame_iter".
56   static string CreateKey(const string& src_device, uint64 src_incarnation,
57                           const string& dst_device, const string& name,
58                           const FrameAndIter& frame_iter);
59 
60   // Parses the key constructed by CreateKey and parse src/dst device
61   // names into structures respectively.
62   struct ParsedKey {
63     StringPiece src_device;
64     DeviceNameUtils::ParsedName src;
65     uint64 src_incarnation = 0;
66     StringPiece dst_device;
67     DeviceNameUtils::ParsedName dst;
68     StringPiece edge_name;
69 
ParsedKeyParsedKey70     ParsedKey() {}
ParsedKeyParsedKey71     ParsedKey(const ParsedKey& b) { *this = b; }
72 
73     ParsedKey& operator=(const ParsedKey& b);
FullKeyParsedKey74     StringPiece FullKey() const { return buf_; }
75 
76    private:
77     friend class Rendezvous;
78     friend class SendOp;
79     friend class RecvOp;
80     string buf_;
81   };
82   static Status ParseKey(StringPiece key, ParsedKey* out);
83 
84   // The caller is a tensor producer and it sends a message (a tensor
85   // "val" and a bool "is_dead") under the given "key".
86   //
87   // {val, is_dead} is bundled as a message sent and received.
88   // Typically, is_dead is set by some control flow nodes
89   // (e.g., a not-taken branch).  args is passed by Send to the
90   // Recv function to communicate any information that the Recv
91   // function might need.  This is typically only necessary for
92   // Send/Recv on the same worker.
93   //
94   // Send() never blocks.
95   virtual Status Send(const ParsedKey& key, const Args& args, const Tensor& val,
96                       const bool is_dead) = 0;
97 
98   // Callback provided by a tensor consumer waiting on the rendezvous.
99   // It will be invoked when the tensor is available, or when a non-OK
100   // status arises in the production of that tensor.  It also gets
101   // two Rendezvous::Args, one provided by the sender, the other by the
102   // receiver, which may be needed when a non-CPU device is in use
103   // by either side.
104   typedef std::function<void(const Status&, const Args&, const Args&,
105                              const Tensor&, const bool)>
106       DoneCallback;
107 
108   virtual void RecvAsync(const ParsedKey& key, const Args& args,
109                          DoneCallback done) = 0;
110 
111   // Synchronous wrapper for RecvAsync.
112   Status Recv(const ParsedKey& key, const Args& args, Tensor* val,
113               bool* is_dead, int64 timeout_ms);
114   Status Recv(const ParsedKey& key, const Args& args, Tensor* val,
115               bool* is_dead);
116 
117   // Aborts all pending and future Send/Recv with the given "status".
118   //
119   // StartAbort() does not wait for ongoing calls to finish.
120   // REQUIRES: !status.ok()
121   virtual void StartAbort(const Status& status) = 0;
122 
123  protected:
124   ~Rendezvous() override;
125 };
126 
127 // Returns a Rendezvous instance that is limited to use only by
128 // producers and consumers in the local process.  The caller assumes
129 // ownership of one Ref() on the returned object.
130 Rendezvous* NewLocalRendezvous();
131 
132 }  // end namespace tensorflow
133 
134 #endif  // TENSORFLOW_FRAMEWORK_RENDEZVOUS_H_
135