1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #ifndef TENSORFLOW_CORE_FRAMEWORK_LOCAL_RENDEZVOUS_H_ 17 #define TENSORFLOW_CORE_FRAMEWORK_LOCAL_RENDEZVOUS_H_ 18 19 #include "tensorflow/core/framework/rendezvous.h" 20 #include "tensorflow/core/framework/tensor.h" 21 #include "tensorflow/core/lib/core/status.h" 22 #include "tensorflow/core/lib/gtl/flatmap.h" 23 #include "tensorflow/core/platform/macros.h" 24 #include "tensorflow/core/platform/mutex.h" 25 #include "tensorflow/core/platform/types.h" 26 27 namespace tensorflow { 28 29 // Implements the basic logic of matching Send and Recv operations. See 30 // RendezvousInterface for more details. 31 // 32 // NOTE: Most users will use a class that wraps LocalRendezvous, such as 33 // IntraProcessRendezvous or RemoteRendezvous. This class does not implement 34 // RendezvousInterface because virtual dispatch to LocalRendezvous methods 35 // is not expected to be needed. 36 class LocalRendezvous { 37 public: 38 // If the class wrapping LocalRendezvous is refcounted (i.e., extending 39 // Rendezvous), pass in its pointer in constructor so the LocalRendezvous 40 // can make sure it outlives the async recv requests. 41 // Pass in nullptr if the wrapping class is not refcounted. LocalRendezvous(Rendezvous * owner)42 explicit LocalRendezvous(Rendezvous* owner) : rc_owner_(owner) {} 43 ~LocalRendezvous(); 44 45 Status Send(const Rendezvous::ParsedKey& key, 46 const Rendezvous::Args& send_args, const Tensor& val, 47 const bool is_dead); 48 void RecvAsync(const Rendezvous::ParsedKey& key, 49 const Rendezvous::Args& recv_args, 50 Rendezvous::DoneCallback done); 51 void StartAbort(const Status& status); 52 53 private: 54 struct Item; 55 56 // By invariant, the item queue under each key is of the form 57 // [item.type == kSend]* meaning each item is a sent message. 58 // or 59 // [item.type == kRecv]* meaning each item is a waiter. 60 struct ItemQueue { 61 void push_back(Item* item); 62 63 Item* head = nullptr; 64 Item* tail = nullptr; 65 }; 66 67 typedef gtl::FlatMap<uint64, ItemQueue> Table; 68 69 // Pointer to the owner class of this LocalRendezvous if it is refcounted. 70 const Rendezvous* rc_owner_; 71 72 // TODO(zhifengc): shard table_. 73 mutex mu_; 74 Table table_ TF_GUARDED_BY(mu_); 75 Status status_ TF_GUARDED_BY(mu_); 76 77 TF_DISALLOW_COPY_AND_ASSIGN(LocalRendezvous); 78 }; 79 80 } // namespace tensorflow 81 82 #endif // TENSORFLOW_CORE_FRAMEWORK_LOCAL_RENDEZVOUS_H_ 83