1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_ASYNC_UNARY_CALL_H
20 #define GRPCPP_IMPL_CODEGEN_ASYNC_UNARY_CALL_H
21 
22 #include <assert.h>
23 #include <grpcpp/impl/codegen/call.h>
24 #include <grpcpp/impl/codegen/channel_interface.h>
25 #include <grpcpp/impl/codegen/client_context.h>
26 #include <grpcpp/impl/codegen/server_context.h>
27 #include <grpcpp/impl/codegen/service_type.h>
28 #include <grpcpp/impl/codegen/status.h>
29 
30 namespace grpc {
31 
32 class CompletionQueue;
33 extern CoreCodegenInterface* g_core_codegen_interface;
34 
35 /// An interface relevant for async client side unary RPCs (which send
36 /// one request message to a server and receive one response message).
37 template <class R>
38 class ClientAsyncResponseReaderInterface {
39  public:
~ClientAsyncResponseReaderInterface()40   virtual ~ClientAsyncResponseReaderInterface() {}
41 
42   /// Start the call that was set up by the constructor, but only if the
43   /// constructor was invoked through the "Prepare" API which doesn't actually
44   /// start the call
45   virtual void StartCall() = 0;
46 
47   /// Request notification of the reading of initial metadata. Completion
48   /// will be notified by \a tag on the associated completion queue.
49   /// This call is optional, but if it is used, it cannot be used concurrently
50   /// with or after the \a Finish method.
51   ///
52   /// \param[in] tag Tag identifying this request.
53   virtual void ReadInitialMetadata(void* tag) = 0;
54 
55   /// Request to receive the server's response \a msg and final \a status for
56   /// the call, and to notify \a tag on this call's completion queue when
57   /// finished.
58   ///
59   /// This function will return when either:
60   /// - when the server's response message and status have been received.
61   /// - when the server has returned a non-OK status (no message expected in
62   ///   this case).
63   /// - when the call failed for some reason and the library generated a
64   ///   non-OK status.
65   ///
66   /// \param[in] tag Tag identifying this request.
67   /// \param[out] status To be updated with the operation status.
68   /// \param[out] msg To be filled in with the server's response message.
69   virtual void Finish(R* msg, Status* status, void* tag) = 0;
70 };
71 
72 namespace internal {
73 template <class R>
74 class ClientAsyncResponseReaderFactory {
75  public:
76   /// Start a call and write the request out if \a start is set.
77   /// \a tag will be notified on \a cq when the call has been started (i.e.
78   /// intitial metadata sent) and \a request has been written out.
79   /// If \a start is not set, the actual call must be initiated by StartCall
80   /// Note that \a context will be used to fill in custom initial metadata
81   /// used to send to the server when starting the call.
82   template <class W>
Create(ChannelInterface * channel,CompletionQueue * cq,const::grpc::internal::RpcMethod & method,ClientContext * context,const W & request,bool start)83   static ClientAsyncResponseReader<R>* Create(
84       ChannelInterface* channel, CompletionQueue* cq,
85       const ::grpc::internal::RpcMethod& method, ClientContext* context,
86       const W& request, bool start) {
87     ::grpc::internal::Call call = channel->CreateCall(method, context, cq);
88     return new (g_core_codegen_interface->grpc_call_arena_alloc(
89         call.call(), sizeof(ClientAsyncResponseReader<R>)))
90         ClientAsyncResponseReader<R>(call, context, request, start);
91   }
92 };
93 }  // namespace internal
94 
95 /// Async API for client-side unary RPCs, where the message response
96 /// received from the server is of type \a R.
97 template <class R>
98 class ClientAsyncResponseReader final
99     : public ClientAsyncResponseReaderInterface<R> {
100  public:
101   // always allocated against a call arena, no memory free required
delete(void * ptr,std::size_t size)102   static void operator delete(void* ptr, std::size_t size) {
103     assert(size == sizeof(ClientAsyncResponseReader));
104   }
105 
106   // This operator should never be called as the memory should be freed as part
107   // of the arena destruction. It only exists to provide a matching operator
108   // delete to the operator new so that some compilers will not complain (see
109   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
110   // there are no tests catching the compiler warning.
delete(void *,void *)111   static void operator delete(void*, void*) { assert(0); }
112 
StartCall()113   void StartCall() override {
114     assert(!started_);
115     started_ = true;
116     StartCallInternal();
117   }
118 
119   /// See \a ClientAsyncResponseReaderInterface::ReadInitialMetadata for
120   /// semantics.
121   ///
122   /// Side effect:
123   ///   - the \a ClientContext associated with this call is updated with
124   ///     possible initial and trailing metadata sent from the server.
ReadInitialMetadata(void * tag)125   void ReadInitialMetadata(void* tag) override {
126     assert(started_);
127     GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
128 
129     single_buf.set_output_tag(tag);
130     single_buf.RecvInitialMetadata(context_);
131     call_.PerformOps(&single_buf);
132     initial_metadata_read_ = true;
133   }
134 
135   /// See \a ClientAysncResponseReaderInterface::Finish for semantics.
136   ///
137   /// Side effect:
138   ///   - the \a ClientContext associated with this call is updated with
139   ///     possible initial and trailing metadata sent from the server.
Finish(R * msg,Status * status,void * tag)140   void Finish(R* msg, Status* status, void* tag) override {
141     assert(started_);
142     if (initial_metadata_read_) {
143       finish_buf.set_output_tag(tag);
144       finish_buf.RecvMessage(msg);
145       finish_buf.AllowNoMessage();
146       finish_buf.ClientRecvStatus(context_, status);
147       call_.PerformOps(&finish_buf);
148     } else {
149       single_buf.set_output_tag(tag);
150       single_buf.RecvInitialMetadata(context_);
151       single_buf.RecvMessage(msg);
152       single_buf.AllowNoMessage();
153       single_buf.ClientRecvStatus(context_, status);
154       call_.PerformOps(&single_buf);
155     }
156   }
157 
158  private:
159   friend class internal::ClientAsyncResponseReaderFactory<R>;
160   ClientContext* const context_;
161   ::grpc::internal::Call call_;
162   bool started_;
163   bool initial_metadata_read_ = false;
164 
165   template <class W>
ClientAsyncResponseReader(::grpc::internal::Call call,ClientContext * context,const W & request,bool start)166   ClientAsyncResponseReader(::grpc::internal::Call call, ClientContext* context,
167                             const W& request, bool start)
168       : context_(context), call_(call), started_(start) {
169     // Bind the metadata at time of StartCallInternal but set up the rest here
170     // TODO(ctiller): don't assert
171     GPR_CODEGEN_ASSERT(single_buf.SendMessage(request).ok());
172     single_buf.ClientSendClose();
173     if (start) StartCallInternal();
174   }
175 
StartCallInternal()176   void StartCallInternal() {
177     single_buf.SendInitialMetadata(context_->send_initial_metadata_,
178                                    context_->initial_metadata_flags());
179   }
180 
181   // disable operator new
182   static void* operator new(std::size_t size);
new(std::size_t size,void * p)183   static void* operator new(std::size_t size, void* p) { return p; }
184 
185   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
186                               ::grpc::internal::CallOpSendMessage,
187                               ::grpc::internal::CallOpClientSendClose,
188                               ::grpc::internal::CallOpRecvInitialMetadata,
189                               ::grpc::internal::CallOpRecvMessage<R>,
190                               ::grpc::internal::CallOpClientRecvStatus>
191       single_buf;
192   ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>,
193                               ::grpc::internal::CallOpClientRecvStatus>
194       finish_buf;
195 };
196 
197 /// Async server-side API for handling unary calls, where the single
198 /// response message sent to the client is of type \a W.
199 template <class W>
200 class ServerAsyncResponseWriter final
201     : public internal::ServerAsyncStreamingInterface {
202  public:
ServerAsyncResponseWriter(ServerContext * ctx)203   explicit ServerAsyncResponseWriter(ServerContext* ctx)
204       : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
205 
206   /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
207   ///
208   /// Side effect:
209   ///   The initial metadata that will be sent to the client from this op will
210   ///   be taken from the \a ServerContext associated with the call.
211   ///
212   /// \param[in] tag Tag identifying this request.
SendInitialMetadata(void * tag)213   void SendInitialMetadata(void* tag) override {
214     GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
215 
216     meta_buf_.set_output_tag(tag);
217     meta_buf_.SendInitialMetadata(ctx_->initial_metadata_,
218                                   ctx_->initial_metadata_flags());
219     if (ctx_->compression_level_set()) {
220       meta_buf_.set_compression_level(ctx_->compression_level());
221     }
222     ctx_->sent_initial_metadata_ = true;
223     call_.PerformOps(&meta_buf_);
224   }
225 
226   /// Indicate that the stream is to be finished and request notification
227   /// when the server has sent the appropriate signals to the client to
228   /// end the call. Should not be used concurrently with other operations.
229   ///
230   /// \param[in] tag Tag identifying this request.
231   /// \param[in] status To be sent to the client as the result of the call.
232   /// \param[in] msg Message to be sent to the client.
233   ///
234   /// Side effect:
235   ///   - also sends initial metadata if not already sent (using the
236   ///     \a ServerContext associated with this call).
237   ///
238   /// Note: if \a status has a non-OK code, then \a msg will not be sent,
239   /// and the client will receive only the status with possible trailing
240   /// metadata.
Finish(const W & msg,const Status & status,void * tag)241   void Finish(const W& msg, const Status& status, void* tag) {
242     finish_buf_.set_output_tag(tag);
243     if (!ctx_->sent_initial_metadata_) {
244       finish_buf_.SendInitialMetadata(ctx_->initial_metadata_,
245                                       ctx_->initial_metadata_flags());
246       if (ctx_->compression_level_set()) {
247         finish_buf_.set_compression_level(ctx_->compression_level());
248       }
249       ctx_->sent_initial_metadata_ = true;
250     }
251     // The response is dropped if the status is not OK.
252     if (status.ok()) {
253       finish_buf_.ServerSendStatus(ctx_->trailing_metadata_,
254                                    finish_buf_.SendMessage(msg));
255     } else {
256       finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
257     }
258     call_.PerformOps(&finish_buf_);
259   }
260 
261   /// Indicate that the stream is to be finished with a non-OK status,
262   /// and request notification for when the server has finished sending the
263   /// appropriate signals to the client to end the call.
264   /// Should not be used concurrently with other operations.
265   ///
266   /// \param[in] tag Tag identifying this request.
267   /// \param[in] status To be sent to the client as the result of the call.
268   ///   - Note: \a status must have a non-OK code.
269   ///
270   /// Side effect:
271   ///   - also sends initial metadata if not already sent (using the
272   ///     \a ServerContext associated with this call).
FinishWithError(const Status & status,void * tag)273   void FinishWithError(const Status& status, void* tag) {
274     GPR_CODEGEN_ASSERT(!status.ok());
275     finish_buf_.set_output_tag(tag);
276     if (!ctx_->sent_initial_metadata_) {
277       finish_buf_.SendInitialMetadata(ctx_->initial_metadata_,
278                                       ctx_->initial_metadata_flags());
279       if (ctx_->compression_level_set()) {
280         finish_buf_.set_compression_level(ctx_->compression_level());
281       }
282       ctx_->sent_initial_metadata_ = true;
283     }
284     finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
285     call_.PerformOps(&finish_buf_);
286   }
287 
288  private:
BindCall(::grpc::internal::Call * call)289   void BindCall(::grpc::internal::Call* call) override { call_ = *call; }
290 
291   ::grpc::internal::Call call_;
292   ServerContext* ctx_;
293   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
294       meta_buf_;
295   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
296                               ::grpc::internal::CallOpSendMessage,
297                               ::grpc::internal::CallOpServerSendStatus>
298       finish_buf_;
299 };
300 
301 }  // namespace grpc
302 
303 namespace std {
304 template <class R>
305 class default_delete<grpc::ClientAsyncResponseReader<R>> {
306  public:
operator()307   void operator()(void* p) {}
308 };
309 template <class R>
310 class default_delete<grpc::ClientAsyncResponseReaderInterface<R>> {
311  public:
operator()312   void operator()(void* p) {}
313 };
314 }  // namespace std
315 
316 #endif  // GRPCPP_IMPL_CODEGEN_ASYNC_UNARY_CALL_H
317