1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_FRAMEWORK_LOCAL_RENDEZVOUS_H_
17 #define TENSORFLOW_CORE_FRAMEWORK_LOCAL_RENDEZVOUS_H_
18 
19 #include "tensorflow/core/framework/rendezvous.h"
20 #include "tensorflow/core/framework/tensor.h"
21 #include "tensorflow/core/lib/core/status.h"
22 #include "tensorflow/core/lib/gtl/flatmap.h"
23 #include "tensorflow/core/platform/macros.h"
24 #include "tensorflow/core/platform/mutex.h"
25 #include "tensorflow/core/platform/types.h"
26 
27 namespace tensorflow {
28 
29 // Implements the basic logic of matching Send and Recv operations. See
30 // RendezvousInterface for more details.
31 //
32 // NOTE: Most users will use a class that wraps LocalRendezvous, such as
33 // IntraProcessRendezvous or RemoteRendezvous. This class does not implement
34 // RendezvousInterface because virtual dispatch to LocalRendezvous methods
35 // is not expected to be needed.
36 class LocalRendezvous {
37  public:
38   // If the class wrapping LocalRendezvous is refcounted (i.e., extending
39   // Rendezvous), pass in its pointer in constructor so the LocalRendezvous
40   // can make sure it outlives the async recv requests.
41   // Pass in nullptr if the wrapping class is not refcounted.
LocalRendezvous(Rendezvous * owner)42   explicit LocalRendezvous(Rendezvous* owner) : rc_owner_(owner) {}
43   ~LocalRendezvous();
44 
45   Status Send(const Rendezvous::ParsedKey& key,
46               const Rendezvous::Args& send_args, const Tensor& val,
47               const bool is_dead);
48   void RecvAsync(const Rendezvous::ParsedKey& key,
49                  const Rendezvous::Args& recv_args,
50                  Rendezvous::DoneCallback done);
51   void StartAbort(const Status& status);
52 
53  private:
54   struct Item;
55 
56   // By invariant, the item queue under each key is of the form
57   //   [item.type == kSend]* meaning each item is a sent message.
58   // or
59   //   [item.type == kRecv]* meaning each item is a waiter.
60   struct ItemQueue {
61     void push_back(Item* item);
62 
63     Item* head = nullptr;
64     Item* tail = nullptr;
65   };
66 
67   typedef gtl::FlatMap<uint64, ItemQueue> Table;
68 
69   // Pointer to the owner class of this LocalRendezvous if it is refcounted.
70   const Rendezvous* rc_owner_;
71 
72   // TODO(zhifengc): shard table_.
73   mutex mu_;
74   Table table_ TF_GUARDED_BY(mu_);
75   Status status_ TF_GUARDED_BY(mu_);
76 
77   TF_DISALLOW_COPY_AND_ASSIGN(LocalRendezvous);
78 };
79 
80 }  // namespace tensorflow
81 
82 #endif  // TENSORFLOW_CORE_FRAMEWORK_LOCAL_RENDEZVOUS_H_
83