1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_SYNC_STREAM_H
20 #define GRPCPP_IMPL_CODEGEN_SYNC_STREAM_H
21 
22 #include <grpcpp/impl/codegen/call.h>
23 #include <grpcpp/impl/codegen/channel_interface.h>
24 #include <grpcpp/impl/codegen/client_context.h>
25 #include <grpcpp/impl/codegen/completion_queue.h>
26 #include <grpcpp/impl/codegen/core_codegen_interface.h>
27 #include <grpcpp/impl/codegen/server_context.h>
28 #include <grpcpp/impl/codegen/service_type.h>
29 #include <grpcpp/impl/codegen/status.h>
30 
31 namespace grpc {
32 
33 namespace internal {
34 /// Common interface for all synchronous client side streaming.
35 class ClientStreamingInterface {
36  public:
~ClientStreamingInterface()37   virtual ~ClientStreamingInterface() {}
38 
39   /// Block waiting until the stream finishes and a final status of the call is
40   /// available.
41   ///
42   /// It is appropriate to call this method when both:
43   ///   * the calling code (client-side) has no more message to send
44   ///     (this can be declared implicitly by calling this method, or
45   ///     explicitly through an earlier call to <i>WritesDone</i> method of the
46   ///     class in use, e.g. \a ClientWriterInterface::WritesDone or
47   ///     \a ClientReaderWriterInterface::WritesDone).
48   ///   * there are no more messages to be received from the server (which can
49   ///     be known implicitly, or explicitly from an earlier call to \a
50   ///     ReaderInterface::Read that returned "false").
51   ///
52   /// This function will return either:
53   /// - when all incoming messages have been read and the server has
54   ///   returned status.
55   /// - when the server has returned a non-OK status.
56   /// - OR when the call failed for some reason and the library generated a
57   ///   status.
58   ///
59   /// Return values:
60   ///   - \a Status contains the status code, message and details for the call
61   ///   - the \a ClientContext associated with this call is updated with
62   ///     possible trailing metadata sent from the server.
63   virtual Status Finish() = 0;
64 };
65 
66 /// Common interface for all synchronous server side streaming.
67 class ServerStreamingInterface {
68  public:
~ServerStreamingInterface()69   virtual ~ServerStreamingInterface() {}
70 
71   /// Block to send initial metadata to client.
72   /// This call is optional, but if it is used, it cannot be used concurrently
73   /// with or after the \a Finish method.
74   ///
75   /// The initial metadata that will be sent to the client will be
76   /// taken from the \a ServerContext associated with the call.
77   virtual void SendInitialMetadata() = 0;
78 };
79 
80 /// An interface that yields a sequence of messages of type \a R.
81 template <class R>
82 class ReaderInterface {
83  public:
~ReaderInterface()84   virtual ~ReaderInterface() {}
85 
86   /// Get an upper bound on the next message size available for reading on this
87   /// stream.
88   virtual bool NextMessageSize(uint32_t* sz) = 0;
89 
90   /// Block to read a message and parse to \a msg. Returns \a true on success.
91   /// This is thread-safe with respect to \a Write or \WritesDone methods on
92   /// the same stream. It should not be called concurrently with another \a
93   /// Read on the same stream as the order of delivery will not be defined.
94   ///
95   /// \param[out] msg The read message.
96   ///
97   /// \return \a false when there will be no more incoming messages, either
98   /// because the other side has called \a WritesDone() or the stream has failed
99   /// (or been cancelled).
100   virtual bool Read(R* msg) = 0;
101 };
102 
103 /// An interface that can be fed a sequence of messages of type \a W.
104 template <class W>
105 class WriterInterface {
106  public:
~WriterInterface()107   virtual ~WriterInterface() {}
108 
109   /// Block to write \a msg to the stream with WriteOptions \a options.
110   /// This is thread-safe with respect to \a ReaderInterface::Read
111   ///
112   /// \param msg The message to be written to the stream.
113   /// \param options The WriteOptions affecting the write operation.
114   ///
115   /// \return \a true on success, \a false when the stream has been closed.
116   virtual bool Write(const W& msg, WriteOptions options) = 0;
117 
118   /// Block to write \a msg to the stream with default write options.
119   /// This is thread-safe with respect to \a ReaderInterface::Read
120   ///
121   /// \param msg The message to be written to the stream.
122   ///
123   /// \return \a true on success, \a false when the stream has been closed.
Write(const W & msg)124   inline bool Write(const W& msg) { return Write(msg, WriteOptions()); }
125 
126   /// Write \a msg and coalesce it with the writing of trailing metadata, using
127   /// WriteOptions \a options.
128   ///
129   /// For client, WriteLast is equivalent of performing Write and WritesDone in
130   /// a single step. \a msg and trailing metadata are coalesced and sent on wire
131   /// by calling this function. For server, WriteLast buffers the \a msg.
132   /// The writing of \a msg is held until the service handler returns,
133   /// where \a msg and trailing metadata are coalesced and sent on wire.
134   /// Note that WriteLast can only buffer \a msg up to the flow control window
135   /// size. If \a msg size is larger than the window size, it will be sent on
136   /// wire without buffering.
137   ///
138   /// \param[in] msg The message to be written to the stream.
139   /// \param[in] options The WriteOptions to be used to write this message.
WriteLast(const W & msg,WriteOptions options)140   void WriteLast(const W& msg, WriteOptions options) {
141     Write(msg, options.set_last_message());
142   }
143 };
144 
145 }  // namespace internal
146 
147 /// Client-side interface for streaming reads of message of type \a R.
148 template <class R>
149 class ClientReaderInterface : public internal::ClientStreamingInterface,
150                               public internal::ReaderInterface<R> {
151  public:
152   /// Block to wait for initial metadata from server. The received metadata
153   /// can only be accessed after this call returns. Should only be called before
154   /// the first read. Calling this method is optional, and if it is not called
155   /// the metadata will be available in ClientContext after the first read.
156   virtual void WaitForInitialMetadata() = 0;
157 };
158 
159 namespace internal {
160 template <class R>
161 class ClientReaderFactory {
162  public:
163   template <class W>
Create(ChannelInterface * channel,const::grpc::internal::RpcMethod & method,ClientContext * context,const W & request)164   static ClientReader<R>* Create(ChannelInterface* channel,
165                                  const ::grpc::internal::RpcMethod& method,
166                                  ClientContext* context, const W& request) {
167     return new ClientReader<R>(channel, method, context, request);
168   }
169 };
170 }  // namespace internal
171 
172 /// Synchronous (blocking) client-side API for doing server-streaming RPCs,
173 /// where the stream of messages coming from the server has messages
174 /// of type \a R.
175 template <class R>
176 class ClientReader final : public ClientReaderInterface<R> {
177  public:
178   /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for
179   /// semantics.
180   ///
181   //  Side effect:
182   ///   Once complete, the initial metadata read from
183   ///   the server will be accessable through the \a ClientContext used to
184   ///   construct this object.
WaitForInitialMetadata()185   void WaitForInitialMetadata() override {
186     GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
187 
188     ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
189         ops;
190     ops.RecvInitialMetadata(context_);
191     call_.PerformOps(&ops);
192     cq_.Pluck(&ops);  /// status ignored
193   }
194 
NextMessageSize(uint32_t * sz)195   bool NextMessageSize(uint32_t* sz) override {
196     *sz = call_.max_receive_message_size();
197     return true;
198   }
199 
200   /// See the \a ReaderInterface.Read method for semantics.
201   /// Side effect:
202   ///   This also receives initial metadata from the server, if not
203   ///   already received (if initial metadata is received, it can be then
204   ///   accessed through the \a ClientContext associated with this call).
Read(R * msg)205   bool Read(R* msg) override {
206     ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
207                                 ::grpc::internal::CallOpRecvMessage<R>>
208         ops;
209     if (!context_->initial_metadata_received_) {
210       ops.RecvInitialMetadata(context_);
211     }
212     ops.RecvMessage(msg);
213     call_.PerformOps(&ops);
214     return cq_.Pluck(&ops) && ops.got_message;
215   }
216 
217   /// See the \a ClientStreamingInterface.Finish method for semantics.
218   ///
219   /// Side effect:
220   ///   The \a ClientContext associated with this call is updated with
221   ///   possible metadata received from the server.
Finish()222   Status Finish() override {
223     ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientRecvStatus> ops;
224     Status status;
225     ops.ClientRecvStatus(context_, &status);
226     call_.PerformOps(&ops);
227     GPR_CODEGEN_ASSERT(cq_.Pluck(&ops));
228     return status;
229   }
230 
231  private:
232   friend class internal::ClientReaderFactory<R>;
233   ClientContext* context_;
234   CompletionQueue cq_;
235   ::grpc::internal::Call call_;
236 
237   /// Block to create a stream and write the initial metadata and \a request
238   /// out. Note that \a context will be used to fill in custom initial
239   /// metadata used to send to the server when starting the call.
240   template <class W>
ClientReader(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,ClientContext * context,const W & request)241   ClientReader(::grpc::ChannelInterface* channel,
242                const ::grpc::internal::RpcMethod& method,
243                ClientContext* context, const W& request)
244       : context_(context),
245         cq_(grpc_completion_queue_attributes{
246             GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING,
247             nullptr}),  // Pluckable cq
248         call_(channel->CreateCall(method, context, &cq_)) {
249     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
250                                 ::grpc::internal::CallOpSendMessage,
251                                 ::grpc::internal::CallOpClientSendClose>
252         ops;
253     ops.SendInitialMetadata(context->send_initial_metadata_,
254                             context->initial_metadata_flags());
255     // TODO(ctiller): don't assert
256     GPR_CODEGEN_ASSERT(ops.SendMessage(request).ok());
257     ops.ClientSendClose();
258     call_.PerformOps(&ops);
259     cq_.Pluck(&ops);
260   }
261 };
262 
263 /// Client-side interface for streaming writes of message type \a W.
264 template <class W>
265 class ClientWriterInterface : public internal::ClientStreamingInterface,
266                               public internal::WriterInterface<W> {
267  public:
268   /// Half close writing from the client. (signal that the stream of messages
269   /// coming from the client is complete).
270   /// Blocks until currently-pending writes are completed.
271   /// Thread safe with respect to \a ReaderInterface::Read operations only
272   ///
273   /// \return Whether the writes were successful.
274   virtual bool WritesDone() = 0;
275 };
276 
277 namespace internal {
278 template <class W>
279 class ClientWriterFactory {
280  public:
281   template <class R>
Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,ClientContext * context,R * response)282   static ClientWriter<W>* Create(::grpc::ChannelInterface* channel,
283                                  const ::grpc::internal::RpcMethod& method,
284                                  ClientContext* context, R* response) {
285     return new ClientWriter<W>(channel, method, context, response);
286   }
287 };
288 }  // namespace internal
289 
290 /// Synchronous (blocking) client-side API for doing client-streaming RPCs,
291 /// where the outgoing message stream coming from the client has messages of
292 /// type \a W.
293 template <class W>
294 class ClientWriter : public ClientWriterInterface<W> {
295  public:
296   /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for
297   /// semantics.
298   ///
299   //  Side effect:
300   ///   Once complete, the initial metadata read from the server will be
301   ///   accessable through the \a ClientContext used to construct this object.
WaitForInitialMetadata()302   void WaitForInitialMetadata() {
303     GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
304 
305     ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
306         ops;
307     ops.RecvInitialMetadata(context_);
308     call_.PerformOps(&ops);
309     cq_.Pluck(&ops);  // status ignored
310   }
311 
312   /// See the WriterInterface.Write(const W& msg, WriteOptions options) method
313   /// for semantics.
314   ///
315   /// Side effect:
316   ///   Also sends initial metadata if not already sent (using the
317   ///   \a ClientContext associated with this call).
318   using ::grpc::internal::WriterInterface<W>::Write;
Write(const W & msg,WriteOptions options)319   bool Write(const W& msg, WriteOptions options) override {
320     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
321                                 ::grpc::internal::CallOpSendMessage,
322                                 ::grpc::internal::CallOpClientSendClose>
323         ops;
324 
325     if (options.is_last_message()) {
326       options.set_buffer_hint();
327       ops.ClientSendClose();
328     }
329     if (context_->initial_metadata_corked_) {
330       ops.SendInitialMetadata(context_->send_initial_metadata_,
331                               context_->initial_metadata_flags());
332       context_->set_initial_metadata_corked(false);
333     }
334     if (!ops.SendMessage(msg, options).ok()) {
335       return false;
336     }
337 
338     call_.PerformOps(&ops);
339     return cq_.Pluck(&ops);
340   }
341 
WritesDone()342   bool WritesDone() override {
343     ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops;
344     ops.ClientSendClose();
345     call_.PerformOps(&ops);
346     return cq_.Pluck(&ops);
347   }
348 
349   /// See the ClientStreamingInterface.Finish method for semantics.
350   /// Side effects:
351   ///   - Also receives initial metadata if not already received.
352   ///   - Attempts to fill in the \a response parameter passed
353   ///     to the constructor of this instance with the response
354   ///     message from the server.
Finish()355   Status Finish() override {
356     Status status;
357     if (!context_->initial_metadata_received_) {
358       finish_ops_.RecvInitialMetadata(context_);
359     }
360     finish_ops_.ClientRecvStatus(context_, &status);
361     call_.PerformOps(&finish_ops_);
362     GPR_CODEGEN_ASSERT(cq_.Pluck(&finish_ops_));
363     return status;
364   }
365 
366  private:
367   friend class internal::ClientWriterFactory<W>;
368 
369   /// Block to create a stream (i.e. send request headers and other initial
370   /// metadata to the server). Note that \a context will be used to fill
371   /// in custom initial metadata. \a response will be filled in with the
372   /// single expected response message from the server upon a successful
373   /// call to the \a Finish method of this instance.
374   template <class R>
ClientWriter(ChannelInterface * channel,const::grpc::internal::RpcMethod & method,ClientContext * context,R * response)375   ClientWriter(ChannelInterface* channel,
376                const ::grpc::internal::RpcMethod& method,
377                ClientContext* context, R* response)
378       : context_(context),
379         cq_(grpc_completion_queue_attributes{
380             GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING,
381             nullptr}),  // Pluckable cq
382         call_(channel->CreateCall(method, context, &cq_)) {
383     finish_ops_.RecvMessage(response);
384     finish_ops_.AllowNoMessage();
385 
386     if (!context_->initial_metadata_corked_) {
387       ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
388           ops;
389       ops.SendInitialMetadata(context->send_initial_metadata_,
390                               context->initial_metadata_flags());
391       call_.PerformOps(&ops);
392       cq_.Pluck(&ops);
393     }
394   }
395 
396   ClientContext* context_;
397   ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
398                               ::grpc::internal::CallOpGenericRecvMessage,
399                               ::grpc::internal::CallOpClientRecvStatus>
400       finish_ops_;
401   CompletionQueue cq_;
402   ::grpc::internal::Call call_;
403 };
404 
405 /// Client-side interface for bi-directional streaming with
406 /// client-to-server stream messages of type \a W and
407 /// server-to-client stream messages of type \a R.
408 template <class W, class R>
409 class ClientReaderWriterInterface : public internal::ClientStreamingInterface,
410                                     public internal::WriterInterface<W>,
411                                     public internal::ReaderInterface<R> {
412  public:
413   /// Block to wait for initial metadata from server. The received metadata
414   /// can only be accessed after this call returns. Should only be called before
415   /// the first read. Calling this method is optional, and if it is not called
416   /// the metadata will be available in ClientContext after the first read.
417   virtual void WaitForInitialMetadata() = 0;
418 
419   /// Half close writing from the client. (signal that the stream of messages
420   /// coming from the clinet is complete).
421   /// Blocks until currently-pending writes are completed.
422   /// Thread-safe with respect to \a ReaderInterface::Read
423   ///
424   /// \return Whether the writes were successful.
425   virtual bool WritesDone() = 0;
426 };
427 
428 namespace internal {
429 template <class W, class R>
430 class ClientReaderWriterFactory {
431  public:
Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,ClientContext * context)432   static ClientReaderWriter<W, R>* Create(
433       ::grpc::ChannelInterface* channel,
434       const ::grpc::internal::RpcMethod& method, ClientContext* context) {
435     return new ClientReaderWriter<W, R>(channel, method, context);
436   }
437 };
438 }  // namespace internal
439 
440 /// Synchronous (blocking) client-side API for bi-directional streaming RPCs,
441 /// where the outgoing message stream coming from the client has messages of
442 /// type \a W, and the incoming messages stream coming from the server has
443 /// messages of type \a R.
444 template <class W, class R>
445 class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
446  public:
447   /// Block waiting to read initial metadata from the server.
448   /// This call is optional, but if it is used, it cannot be used concurrently
449   /// with or after the \a Finish method.
450   ///
451   /// Once complete, the initial metadata read from the server will be
452   /// accessable through the \a ClientContext used to construct this object.
WaitForInitialMetadata()453   void WaitForInitialMetadata() override {
454     GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
455 
456     ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
457         ops;
458     ops.RecvInitialMetadata(context_);
459     call_.PerformOps(&ops);
460     cq_.Pluck(&ops);  // status ignored
461   }
462 
NextMessageSize(uint32_t * sz)463   bool NextMessageSize(uint32_t* sz) override {
464     *sz = call_.max_receive_message_size();
465     return true;
466   }
467 
468   /// See the \a ReaderInterface.Read method for semantics.
469   /// Side effect:
470   ///   Also receives initial metadata if not already received (updates the \a
471   ///   ClientContext associated with this call in that case).
Read(R * msg)472   bool Read(R* msg) override {
473     ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
474                                 ::grpc::internal::CallOpRecvMessage<R>>
475         ops;
476     if (!context_->initial_metadata_received_) {
477       ops.RecvInitialMetadata(context_);
478     }
479     ops.RecvMessage(msg);
480     call_.PerformOps(&ops);
481     return cq_.Pluck(&ops) && ops.got_message;
482   }
483 
484   /// See the \a WriterInterface.Write method for semantics.
485   ///
486   /// Side effect:
487   ///   Also sends initial metadata if not already sent (using the
488   ///   \a ClientContext associated with this call to fill in values).
489   using ::grpc::internal::WriterInterface<W>::Write;
Write(const W & msg,WriteOptions options)490   bool Write(const W& msg, WriteOptions options) override {
491     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
492                                 ::grpc::internal::CallOpSendMessage,
493                                 ::grpc::internal::CallOpClientSendClose>
494         ops;
495 
496     if (options.is_last_message()) {
497       options.set_buffer_hint();
498       ops.ClientSendClose();
499     }
500     if (context_->initial_metadata_corked_) {
501       ops.SendInitialMetadata(context_->send_initial_metadata_,
502                               context_->initial_metadata_flags());
503       context_->set_initial_metadata_corked(false);
504     }
505     if (!ops.SendMessage(msg, options).ok()) {
506       return false;
507     }
508 
509     call_.PerformOps(&ops);
510     return cq_.Pluck(&ops);
511   }
512 
WritesDone()513   bool WritesDone() override {
514     ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops;
515     ops.ClientSendClose();
516     call_.PerformOps(&ops);
517     return cq_.Pluck(&ops);
518   }
519 
520   /// See the ClientStreamingInterface.Finish method for semantics.
521   ///
522   /// Side effect:
523   ///   - the \a ClientContext associated with this call is updated with
524   ///     possible trailing metadata sent from the server.
Finish()525   Status Finish() override {
526     ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
527                                 ::grpc::internal::CallOpClientRecvStatus>
528         ops;
529     if (!context_->initial_metadata_received_) {
530       ops.RecvInitialMetadata(context_);
531     }
532     Status status;
533     ops.ClientRecvStatus(context_, &status);
534     call_.PerformOps(&ops);
535     GPR_CODEGEN_ASSERT(cq_.Pluck(&ops));
536     return status;
537   }
538 
539  private:
540   friend class internal::ClientReaderWriterFactory<W, R>;
541 
542   ClientContext* context_;
543   CompletionQueue cq_;
544   ::grpc::internal::Call call_;
545 
546   /// Block to create a stream and write the initial metadata and \a request
547   /// out. Note that \a context will be used to fill in custom initial metadata
548   /// used to send to the server when starting the call.
ClientReaderWriter(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,ClientContext * context)549   ClientReaderWriter(::grpc::ChannelInterface* channel,
550                      const ::grpc::internal::RpcMethod& method,
551                      ClientContext* context)
552       : context_(context),
553         cq_(grpc_completion_queue_attributes{
554             GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING,
555             nullptr}),  // Pluckable cq
556         call_(channel->CreateCall(method, context, &cq_)) {
557     if (!context_->initial_metadata_corked_) {
558       ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
559           ops;
560       ops.SendInitialMetadata(context->send_initial_metadata_,
561                               context->initial_metadata_flags());
562       call_.PerformOps(&ops);
563       cq_.Pluck(&ops);
564     }
565   }
566 };
567 
568 /// Server-side interface for streaming reads of message of type \a R.
569 template <class R>
570 class ServerReaderInterface : public internal::ServerStreamingInterface,
571                               public internal::ReaderInterface<R> {};
572 
573 /// Synchronous (blocking) server-side API for doing client-streaming RPCs,
574 /// where the incoming message stream coming from the client has messages of
575 /// type \a R.
576 template <class R>
577 class ServerReader final : public ServerReaderInterface<R> {
578  public:
579   /// See the \a ServerStreamingInterface.SendInitialMetadata method
580   /// for semantics. Note that initial metadata will be affected by the
581   /// \a ServerContext associated with this call.
SendInitialMetadata()582   void SendInitialMetadata() override {
583     GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
584 
585     internal::CallOpSet<internal::CallOpSendInitialMetadata> ops;
586     ops.SendInitialMetadata(ctx_->initial_metadata_,
587                             ctx_->initial_metadata_flags());
588     if (ctx_->compression_level_set()) {
589       ops.set_compression_level(ctx_->compression_level());
590     }
591     ctx_->sent_initial_metadata_ = true;
592     call_->PerformOps(&ops);
593     call_->cq()->Pluck(&ops);
594   }
595 
NextMessageSize(uint32_t * sz)596   bool NextMessageSize(uint32_t* sz) override {
597     *sz = call_->max_receive_message_size();
598     return true;
599   }
600 
Read(R * msg)601   bool Read(R* msg) override {
602     internal::CallOpSet<internal::CallOpRecvMessage<R>> ops;
603     ops.RecvMessage(msg);
604     call_->PerformOps(&ops);
605     return call_->cq()->Pluck(&ops) && ops.got_message;
606   }
607 
608  private:
609   internal::Call* const call_;
610   ServerContext* const ctx_;
611 
612   template <class ServiceType, class RequestType, class ResponseType>
613   friend class internal::ClientStreamingHandler;
614 
ServerReader(internal::Call * call,ServerContext * ctx)615   ServerReader(internal::Call* call, ServerContext* ctx)
616       : call_(call), ctx_(ctx) {}
617 };
618 
619 /// Server-side interface for streaming writes of message of type \a W.
620 template <class W>
621 class ServerWriterInterface : public internal::ServerStreamingInterface,
622                               public internal::WriterInterface<W> {};
623 
624 /// Synchronous (blocking) server-side API for doing for doing a
625 /// server-streaming RPCs, where the outgoing message stream coming from the
626 /// server has messages of type \a W.
627 template <class W>
628 class ServerWriter final : public ServerWriterInterface<W> {
629  public:
630   /// See the \a ServerStreamingInterface.SendInitialMetadata method
631   /// for semantics.
632   /// Note that initial metadata will be affected by the
633   /// \a ServerContext associated with this call.
SendInitialMetadata()634   void SendInitialMetadata() override {
635     GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
636 
637     internal::CallOpSet<internal::CallOpSendInitialMetadata> ops;
638     ops.SendInitialMetadata(ctx_->initial_metadata_,
639                             ctx_->initial_metadata_flags());
640     if (ctx_->compression_level_set()) {
641       ops.set_compression_level(ctx_->compression_level());
642     }
643     ctx_->sent_initial_metadata_ = true;
644     call_->PerformOps(&ops);
645     call_->cq()->Pluck(&ops);
646   }
647 
648   /// See the \a WriterInterface.Write method for semantics.
649   ///
650   /// Side effect:
651   ///   Also sends initial metadata if not already sent (using the
652   ///   \a ClientContext associated with this call to fill in values).
653   using internal::WriterInterface<W>::Write;
Write(const W & msg,WriteOptions options)654   bool Write(const W& msg, WriteOptions options) override {
655     if (options.is_last_message()) {
656       options.set_buffer_hint();
657     }
658 
659     if (!ctx_->pending_ops_.SendMessage(msg, options).ok()) {
660       return false;
661     }
662     if (!ctx_->sent_initial_metadata_) {
663       ctx_->pending_ops_.SendInitialMetadata(ctx_->initial_metadata_,
664                                              ctx_->initial_metadata_flags());
665       if (ctx_->compression_level_set()) {
666         ctx_->pending_ops_.set_compression_level(ctx_->compression_level());
667       }
668       ctx_->sent_initial_metadata_ = true;
669     }
670     call_->PerformOps(&ctx_->pending_ops_);
671     // if this is the last message we defer the pluck until AFTER we start
672     // the trailing md op. This prevents hangs. See
673     // https://github.com/grpc/grpc/issues/11546
674     if (options.is_last_message()) {
675       ctx_->has_pending_ops_ = true;
676       return true;
677     }
678     ctx_->has_pending_ops_ = false;
679     return call_->cq()->Pluck(&ctx_->pending_ops_);
680   }
681 
682  private:
683   internal::Call* const call_;
684   ServerContext* const ctx_;
685 
686   template <class ServiceType, class RequestType, class ResponseType>
687   friend class internal::ServerStreamingHandler;
688 
ServerWriter(internal::Call * call,ServerContext * ctx)689   ServerWriter(internal::Call* call, ServerContext* ctx)
690       : call_(call), ctx_(ctx) {}
691 };
692 
693 /// Server-side interface for bi-directional streaming.
694 template <class W, class R>
695 class ServerReaderWriterInterface : public internal::ServerStreamingInterface,
696                                     public internal::WriterInterface<W>,
697                                     public internal::ReaderInterface<R> {};
698 
699 /// Actual implementation of bi-directional streaming
700 namespace internal {
701 template <class W, class R>
702 class ServerReaderWriterBody final {
703  public:
ServerReaderWriterBody(Call * call,ServerContext * ctx)704   ServerReaderWriterBody(Call* call, ServerContext* ctx)
705       : call_(call), ctx_(ctx) {}
706 
SendInitialMetadata()707   void SendInitialMetadata() {
708     GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
709 
710     CallOpSet<CallOpSendInitialMetadata> ops;
711     ops.SendInitialMetadata(ctx_->initial_metadata_,
712                             ctx_->initial_metadata_flags());
713     if (ctx_->compression_level_set()) {
714       ops.set_compression_level(ctx_->compression_level());
715     }
716     ctx_->sent_initial_metadata_ = true;
717     call_->PerformOps(&ops);
718     call_->cq()->Pluck(&ops);
719   }
720 
NextMessageSize(uint32_t * sz)721   bool NextMessageSize(uint32_t* sz) {
722     *sz = call_->max_receive_message_size();
723     return true;
724   }
725 
Read(R * msg)726   bool Read(R* msg) {
727     CallOpSet<CallOpRecvMessage<R>> ops;
728     ops.RecvMessage(msg);
729     call_->PerformOps(&ops);
730     return call_->cq()->Pluck(&ops) && ops.got_message;
731   }
732 
Write(const W & msg,WriteOptions options)733   bool Write(const W& msg, WriteOptions options) {
734     if (options.is_last_message()) {
735       options.set_buffer_hint();
736     }
737     if (!ctx_->pending_ops_.SendMessage(msg, options).ok()) {
738       return false;
739     }
740     if (!ctx_->sent_initial_metadata_) {
741       ctx_->pending_ops_.SendInitialMetadata(ctx_->initial_metadata_,
742                                              ctx_->initial_metadata_flags());
743       if (ctx_->compression_level_set()) {
744         ctx_->pending_ops_.set_compression_level(ctx_->compression_level());
745       }
746       ctx_->sent_initial_metadata_ = true;
747     }
748     call_->PerformOps(&ctx_->pending_ops_);
749     // if this is the last message we defer the pluck until AFTER we start
750     // the trailing md op. This prevents hangs. See
751     // https://github.com/grpc/grpc/issues/11546
752     if (options.is_last_message()) {
753       ctx_->has_pending_ops_ = true;
754       return true;
755     }
756     ctx_->has_pending_ops_ = false;
757     return call_->cq()->Pluck(&ctx_->pending_ops_);
758   }
759 
760  private:
761   Call* const call_;
762   ServerContext* const ctx_;
763 };
764 
765 }  // namespace internal
766 
767 /// Synchronous (blocking) server-side API for a bidirectional
768 /// streaming call, where the incoming message stream coming from the client has
769 /// messages of type \a R, and the outgoing message streaming coming from
770 /// the server has messages of type \a W.
771 template <class W, class R>
772 class ServerReaderWriter final : public ServerReaderWriterInterface<W, R> {
773  public:
774   /// See the \a ServerStreamingInterface.SendInitialMetadata method
775   /// for semantics. Note that initial metadata will be affected by the
776   /// \a ServerContext associated with this call.
SendInitialMetadata()777   void SendInitialMetadata() override { body_.SendInitialMetadata(); }
778 
NextMessageSize(uint32_t * sz)779   bool NextMessageSize(uint32_t* sz) override {
780     return body_.NextMessageSize(sz);
781   }
782 
Read(R * msg)783   bool Read(R* msg) override { return body_.Read(msg); }
784 
785   /// See the \a WriterInterface.Write(const W& msg, WriteOptions options)
786   /// method for semantics.
787   /// Side effect:
788   ///   Also sends initial metadata if not already sent (using the \a
789   ///   ServerContext associated with this call).
790   using internal::WriterInterface<W>::Write;
Write(const W & msg,WriteOptions options)791   bool Write(const W& msg, WriteOptions options) override {
792     return body_.Write(msg, options);
793   }
794 
795  private:
796   internal::ServerReaderWriterBody<W, R> body_;
797 
798   friend class internal::TemplatedBidiStreamingHandler<ServerReaderWriter<W, R>,
799                                                        false>;
ServerReaderWriter(internal::Call * call,ServerContext * ctx)800   ServerReaderWriter(internal::Call* call, ServerContext* ctx)
801       : body_(call, ctx) {}
802 };
803 
804 /// A class to represent a flow-controlled unary call. This is something
805 /// of a hybrid between conventional unary and streaming. This is invoked
806 /// through a unary call on the client side, but the server responds to it
807 /// as though it were a single-ping-pong streaming call. The server can use
808 /// the \a NextMessageSize method to determine an upper-bound on the size of
809 /// the message. A key difference relative to streaming: ServerUnaryStreamer
810 /// must have exactly 1 Read and exactly 1 Write, in that order, to function
811 /// correctly. Otherwise, the RPC is in error.
812 template <class RequestType, class ResponseType>
813 class ServerUnaryStreamer final
814     : public ServerReaderWriterInterface<ResponseType, RequestType> {
815  public:
816   /// Block to send initial metadata to client.
817   /// Implicit input parameter:
818   ///    - the \a ServerContext associated with this call will be used for
819   ///      sending initial metadata.
SendInitialMetadata()820   void SendInitialMetadata() override { body_.SendInitialMetadata(); }
821 
822   /// Get an upper bound on the request message size from the client.
NextMessageSize(uint32_t * sz)823   bool NextMessageSize(uint32_t* sz) override {
824     return body_.NextMessageSize(sz);
825   }
826 
827   /// Read a message of type \a R into \a msg. Completion will be notified by \a
828   /// tag on the associated completion queue.
829   /// This is thread-safe with respect to \a Write or \a WritesDone methods. It
830   /// should not be called concurrently with other streaming APIs
831   /// on the same stream. It is not meaningful to call it concurrently
832   /// with another \a ReaderInterface::Read on the same stream since reads on
833   /// the same stream are delivered in order.
834   ///
835   /// \param[out] msg Where to eventually store the read message.
836   /// \param[in] tag The tag identifying the operation.
Read(RequestType * request)837   bool Read(RequestType* request) override {
838     if (read_done_) {
839       return false;
840     }
841     read_done_ = true;
842     return body_.Read(request);
843   }
844 
845   /// Block to write \a msg to the stream with WriteOptions \a options.
846   /// This is thread-safe with respect to \a ReaderInterface::Read
847   ///
848   /// \param msg The message to be written to the stream.
849   /// \param options The WriteOptions affecting the write operation.
850   ///
851   /// \return \a true on success, \a false when the stream has been closed.
852   using internal::WriterInterface<ResponseType>::Write;
Write(const ResponseType & response,WriteOptions options)853   bool Write(const ResponseType& response, WriteOptions options) override {
854     if (write_done_ || !read_done_) {
855       return false;
856     }
857     write_done_ = true;
858     return body_.Write(response, options);
859   }
860 
861  private:
862   internal::ServerReaderWriterBody<ResponseType, RequestType> body_;
863   bool read_done_;
864   bool write_done_;
865 
866   friend class internal::TemplatedBidiStreamingHandler<
867       ServerUnaryStreamer<RequestType, ResponseType>, true>;
ServerUnaryStreamer(internal::Call * call,ServerContext * ctx)868   ServerUnaryStreamer(internal::Call* call, ServerContext* ctx)
869       : body_(call, ctx), read_done_(false), write_done_(false) {}
870 };
871 
872 /// A class to represent a flow-controlled server-side streaming call.
873 /// This is something of a hybrid between server-side and bidi streaming.
874 /// This is invoked through a server-side streaming call on the client side,
875 /// but the server responds to it as though it were a bidi streaming call that
876 /// must first have exactly 1 Read and then any number of Writes.
877 template <class RequestType, class ResponseType>
878 class ServerSplitStreamer final
879     : public ServerReaderWriterInterface<ResponseType, RequestType> {
880  public:
881   /// Block to send initial metadata to client.
882   /// Implicit input parameter:
883   ///    - the \a ServerContext associated with this call will be used for
884   ///      sending initial metadata.
SendInitialMetadata()885   void SendInitialMetadata() override { body_.SendInitialMetadata(); }
886 
887   /// Get an upper bound on the request message size from the client.
NextMessageSize(uint32_t * sz)888   bool NextMessageSize(uint32_t* sz) override {
889     return body_.NextMessageSize(sz);
890   }
891 
892   /// Read a message of type \a R into \a msg. Completion will be notified by \a
893   /// tag on the associated completion queue.
894   /// This is thread-safe with respect to \a Write or \a WritesDone methods. It
895   /// should not be called concurrently with other streaming APIs
896   /// on the same stream. It is not meaningful to call it concurrently
897   /// with another \a ReaderInterface::Read on the same stream since reads on
898   /// the same stream are delivered in order.
899   ///
900   /// \param[out] msg Where to eventually store the read message.
901   /// \param[in] tag The tag identifying the operation.
Read(RequestType * request)902   bool Read(RequestType* request) override {
903     if (read_done_) {
904       return false;
905     }
906     read_done_ = true;
907     return body_.Read(request);
908   }
909 
910   /// Block to write \a msg to the stream with WriteOptions \a options.
911   /// This is thread-safe with respect to \a ReaderInterface::Read
912   ///
913   /// \param msg The message to be written to the stream.
914   /// \param options The WriteOptions affecting the write operation.
915   ///
916   /// \return \a true on success, \a false when the stream has been closed.
917   using internal::WriterInterface<ResponseType>::Write;
Write(const ResponseType & response,WriteOptions options)918   bool Write(const ResponseType& response, WriteOptions options) override {
919     return read_done_ && body_.Write(response, options);
920   }
921 
922  private:
923   internal::ServerReaderWriterBody<ResponseType, RequestType> body_;
924   bool read_done_;
925 
926   friend class internal::TemplatedBidiStreamingHandler<
927       ServerSplitStreamer<RequestType, ResponseType>, false>;
ServerSplitStreamer(internal::Call * call,ServerContext * ctx)928   ServerSplitStreamer(internal::Call* call, ServerContext* ctx)
929       : body_(call, ctx), read_done_(false) {}
930 };
931 
932 }  // namespace grpc
933 
934 #endif  // GRPCPP_IMPL_CODEGEN_SYNC_STREAM_H
935