1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15require 'forwardable'
16require 'weakref'
17require_relative 'bidi_call'
18
19class Struct
20  # BatchResult is the struct returned by calls to call#start_batch.
21  class BatchResult
22    # check_status returns the status, raising an error if the status
23    # is non-nil and not OK.
24    def check_status
25      return nil if status.nil?
26      fail GRPC::Cancelled if status.code == GRPC::Core::StatusCodes::CANCELLED
27      if status.code != GRPC::Core::StatusCodes::OK
28        GRPC.logger.debug("Failing with status #{status}")
29        # raise BadStatus, propagating the metadata if present.
30        md = status.metadata
31        fail GRPC::BadStatus.new_status_exception(
32          status.code, status.details, md)
33      end
34      status
35    end
36  end
37end
38
39# GRPC contains the General RPC module.
40module GRPC
41  # The ActiveCall class provides simple methods for sending marshallable
42  # data to a call
43  class ActiveCall # rubocop:disable Metrics/ClassLength
44    include Core::TimeConsts
45    include Core::CallOps
46    extend Forwardable
47    attr_reader :deadline, :metadata_sent, :metadata_to_send, :peer, :peer_cert
48    def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=,
49                   :trailing_metadata, :status
50
51    # client_invoke begins a client invocation.
52    #
53    # Flow Control note: this blocks until flow control accepts that client
54    # request can go ahead.
55    #
56    # deadline is the absolute deadline for the call.
57    #
58    # == Keyword Arguments ==
59    # any keyword arguments are treated as metadata to be sent to the server
60    # if a keyword value is a list, multiple metadata for it's key are sent
61    #
62    # @param call [Call] a call on which to start and invocation
63    # @param metadata [Hash] the metadata
64    def self.client_invoke(call, metadata = {})
65      fail(TypeError, '!Core::Call') unless call.is_a? Core::Call
66      call.run_batch(SEND_INITIAL_METADATA => metadata)
67    end
68
69    # Creates an ActiveCall.
70    #
71    # ActiveCall should only be created after a call is accepted.  That
72    # means different things on a client and a server.  On the client, the
73    # call is accepted after calling call.invoke.  On the server, this is
74    # after call.accept.
75    #
76    # #initialize cannot determine if the call is accepted or not; so if a
77    # call that's not accepted is used here, the error won't be visible until
78    # the ActiveCall methods are called.
79    #
80    # deadline is the absolute deadline for the call.
81    #
82    # @param call [Call] the call used by the ActiveCall
83    # @param marshal [Function] f(obj)->string that marshal requests
84    # @param unmarshal [Function] f(string)->obj that unmarshals responses
85    # @param deadline [Fixnum] the deadline for the call to complete
86    # @param started [true|false] indicates that metadata was sent
87    # @param metadata_received [true|false] indicates if metadata has already
88    #     been received. Should always be true for server calls
89    def initialize(call, marshal, unmarshal, deadline, started: true,
90                   metadata_received: false, metadata_to_send: nil)
91      fail(TypeError, '!Core::Call') unless call.is_a? Core::Call
92      @call = call
93      @deadline = deadline
94      @marshal = marshal
95      @unmarshal = unmarshal
96      @metadata_received = metadata_received
97      @metadata_sent = started
98      @op_notifier = nil
99
100      fail(ArgumentError, 'Already sent md') if started && metadata_to_send
101      @metadata_to_send = metadata_to_send || {} unless started
102      @send_initial_md_mutex = Mutex.new
103
104      @output_stream_done = false
105      @input_stream_done = false
106      @call_finished = false
107      @call_finished_mu = Mutex.new
108
109      @client_call_executed = false
110      @client_call_executed_mu = Mutex.new
111
112      # set the peer now so that the accessor can still function
113      # after the server closes the call
114      @peer = call.peer
115    end
116
117    # Sends the initial metadata that has yet to be sent.
118    # Does nothing if metadata has already been sent for this call.
119    def send_initial_metadata(new_metadata = {})
120      @send_initial_md_mutex.synchronize do
121        return if @metadata_sent
122        @metadata_to_send.merge!(new_metadata)
123        ActiveCall.client_invoke(@call, @metadata_to_send)
124        @metadata_sent = true
125      end
126    end
127
128    # output_metadata are provides access to hash that can be used to
129    # save metadata to be sent as trailer
130    def output_metadata
131      @output_metadata ||= {}
132    end
133
134    # cancelled indicates if the call was cancelled
135    def cancelled?
136      !@call.status.nil? && @call.status.code == Core::StatusCodes::CANCELLED
137    end
138
139    # multi_req_view provides a restricted view of this ActiveCall for use
140    # in a server client-streaming handler.
141    def multi_req_view
142      MultiReqView.new(self)
143    end
144
145    # single_req_view provides a restricted view of this ActiveCall for use in
146    # a server request-response handler.
147    def single_req_view
148      SingleReqView.new(self)
149    end
150
151    # operation provides a restricted view of this ActiveCall for use as
152    # a Operation.
153    def operation
154      @op_notifier = Notifier.new
155      Operation.new(self)
156    end
157
158    ##
159    # Returns a restricted view of this ActiveCall for use in interceptors
160    #
161    # @return [InterceptableView]
162    #
163    def interceptable
164      InterceptableView.new(self)
165    end
166
167    def receive_and_check_status
168      batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
169      set_input_stream_done
170      attach_status_results_and_complete_call(batch_result)
171    end
172
173    def attach_status_results_and_complete_call(recv_status_batch_result)
174      unless recv_status_batch_result.status.nil?
175        @call.trailing_metadata = recv_status_batch_result.status.metadata
176      end
177      @call.status = recv_status_batch_result.status
178
179      # The RECV_STATUS in run_batch always succeeds
180      # Check the status for a bad status or failed run batch
181      recv_status_batch_result.check_status
182    end
183
184    # remote_send sends a request to the remote endpoint.
185    #
186    # It blocks until the remote endpoint accepts the message.
187    #
188    # @param req [Object, String] the object to send or it's marshal form.
189    # @param marshalled [false, true] indicates if the object is already
190    # marshalled.
191    def remote_send(req, marshalled = false)
192      send_initial_metadata
193      GRPC.logger.debug("sending #{req}, marshalled? #{marshalled}")
194      payload = marshalled ? req : @marshal.call(req)
195      @call.run_batch(SEND_MESSAGE => payload)
196    end
197
198    # send_status sends a status to the remote endpoint.
199    #
200    # @param code [int] the status code to send
201    # @param details [String] details
202    # @param assert_finished [true, false] when true(default), waits for
203    # FINISHED.
204    # @param metadata [Hash] metadata to send to the server. If a value is a
205    # list, mulitple metadata for its key are sent
206    def send_status(code = OK, details = '', assert_finished = false,
207                    metadata: {})
208      send_initial_metadata
209      ops = {
210        SEND_STATUS_FROM_SERVER => Struct::Status.new(code, details, metadata)
211      }
212      ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished
213      @call.run_batch(ops)
214      set_output_stream_done
215
216      nil
217    end
218
219    # Intended for use on server-side calls when a single request from
220    # the client is expected (i.e., unary and server-streaming RPC types).
221    def read_unary_request
222      req = remote_read
223      set_input_stream_done
224      req
225    end
226
227    def server_unary_response(req, trailing_metadata: {},
228                              code: Core::StatusCodes::OK, details: 'OK')
229      ops = {}
230      @send_initial_md_mutex.synchronize do
231        ops[SEND_INITIAL_METADATA] = @metadata_to_send unless @metadata_sent
232        @metadata_sent = true
233      end
234
235      payload = @marshal.call(req)
236      ops[SEND_MESSAGE] = payload
237      ops[SEND_STATUS_FROM_SERVER] = Struct::Status.new(
238        code, details, trailing_metadata)
239      ops[RECV_CLOSE_ON_SERVER] = nil
240
241      @call.run_batch(ops)
242      set_output_stream_done
243    end
244
245    # remote_read reads a response from the remote endpoint.
246    #
247    # It blocks until the remote endpoint replies with a message or status.
248    # On receiving a message, it returns the response after unmarshalling it.
249    # On receiving a status, it returns nil if the status is OK, otherwise
250    # raising BadStatus
251    def remote_read
252      ops = { RECV_MESSAGE => nil }
253      ops[RECV_INITIAL_METADATA] = nil unless @metadata_received
254      batch_result = @call.run_batch(ops)
255      unless @metadata_received
256        @call.metadata = batch_result.metadata
257        @metadata_received = true
258      end
259      get_message_from_batch_result(batch_result)
260    end
261
262    def get_message_from_batch_result(recv_message_batch_result)
263      unless recv_message_batch_result.nil? ||
264             recv_message_batch_result.message.nil?
265        return @unmarshal.call(recv_message_batch_result.message)
266      end
267      GRPC.logger.debug('found nil; the final response has been sent')
268      nil
269    end
270
271    # each_remote_read passes each response to the given block or returns an
272    # enumerator the responses if no block is given.
273    # Used to generate the request enumerable for
274    # server-side client-streaming RPC's.
275    #
276    # == Enumerator ==
277    #
278    # * #next blocks until the remote endpoint sends a READ or FINISHED
279    # * for each read, enumerator#next yields the response
280    # * on status
281    #    * if it's is OK, enumerator#next raises StopException
282    #    * if is not OK, enumerator#next raises RuntimeException
283    #
284    # == Block ==
285    #
286    # * if provided it is executed for each response
287    # * the call blocks until no more responses are provided
288    #
289    # @return [Enumerator] if no block was given
290    def each_remote_read
291      return enum_for(:each_remote_read) unless block_given?
292      begin
293        loop do
294          resp = remote_read
295          break if resp.nil?  # the last response was received
296          yield resp
297        end
298      ensure
299        set_input_stream_done
300      end
301    end
302
303    # each_remote_read_then_finish passes each response to the given block or
304    # returns an enumerator of the responses if no block is given.
305    #
306    # It is like each_remote_read, but it blocks on finishing on detecting
307    # the final message.
308    #
309    # == Enumerator ==
310    #
311    # * #next blocks until the remote endpoint sends a READ or FINISHED
312    # * for each read, enumerator#next yields the response
313    # * on status
314    #    * if it's is OK, enumerator#next raises StopException
315    #    * if is not OK, enumerator#next raises RuntimeException
316    #
317    # == Block ==
318    #
319    # * if provided it is executed for each response
320    # * the call blocks until no more responses are provided
321    #
322    # @return [Enumerator] if no block was given
323    def each_remote_read_then_finish
324      return enum_for(:each_remote_read_then_finish) unless block_given?
325      loop do
326        resp =
327          begin
328            remote_read
329          rescue GRPC::Core::CallError => e
330            GRPC.logger.warn("In each_remote_read_then_finish: #{e}")
331            nil
332          end
333
334        break if resp.nil?  # the last response was received
335        yield resp
336      end
337
338      receive_and_check_status
339    ensure
340      set_input_stream_done
341    end
342
343    # request_response sends a request to a GRPC server, and returns the
344    # response.
345    #
346    # @param req [Object] the request sent to the server
347    # @param metadata [Hash] metadata to be sent to the server. If a value is
348    # a list, multiple metadata for its key are sent
349    # @return [Object] the response received from the server
350    def request_response(req, metadata: {})
351      raise_error_if_already_executed
352      ops = {
353        SEND_MESSAGE => @marshal.call(req),
354        SEND_CLOSE_FROM_CLIENT => nil,
355        RECV_INITIAL_METADATA => nil,
356        RECV_MESSAGE => nil,
357        RECV_STATUS_ON_CLIENT => nil
358      }
359      @send_initial_md_mutex.synchronize do
360        # Metadata might have already been sent if this is an operation view
361        unless @metadata_sent
362          ops[SEND_INITIAL_METADATA] = @metadata_to_send.merge!(metadata)
363        end
364        @metadata_sent = true
365      end
366
367      begin
368        batch_result = @call.run_batch(ops)
369        # no need to check for cancellation after a CallError because this
370        # batch contains a RECV_STATUS op
371      ensure
372        set_input_stream_done
373        set_output_stream_done
374      end
375
376      @call.metadata = batch_result.metadata
377      attach_status_results_and_complete_call(batch_result)
378      get_message_from_batch_result(batch_result)
379    end
380
381    # client_streamer sends a stream of requests to a GRPC server, and
382    # returns a single response.
383    #
384    # requests provides an 'iterable' of Requests. I.e. it follows Ruby's
385    # #each enumeration protocol. In the simplest case, requests will be an
386    # array of marshallable objects; in typical case it will be an Enumerable
387    # that allows dynamic construction of the marshallable objects.
388    #
389    # @param requests [Object] an Enumerable of requests to send
390    # @param metadata [Hash] metadata to be sent to the server. If a value is
391    # a list, multiple metadata for its key are sent
392    # @return [Object] the response received from the server
393    def client_streamer(requests, metadata: {})
394      raise_error_if_already_executed
395      begin
396        send_initial_metadata(metadata)
397        requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) }
398      rescue GRPC::Core::CallError => e
399        receive_and_check_status # check for Cancelled
400        raise e
401      rescue => e
402        set_input_stream_done
403        raise e
404      ensure
405        set_output_stream_done
406      end
407
408      batch_result = @call.run_batch(
409        SEND_CLOSE_FROM_CLIENT => nil,
410        RECV_INITIAL_METADATA => nil,
411        RECV_MESSAGE => nil,
412        RECV_STATUS_ON_CLIENT => nil
413      )
414
415      set_input_stream_done
416
417      @call.metadata = batch_result.metadata
418      attach_status_results_and_complete_call(batch_result)
419      get_message_from_batch_result(batch_result)
420    end
421
422    # server_streamer sends one request to the GRPC server, which yields a
423    # stream of responses.
424    #
425    # responses provides an enumerator over the streamed responses, i.e. it
426    # follows Ruby's #each iteration protocol.  The enumerator blocks while
427    # waiting for each response, stops when the server signals that no
428    # further responses will be supplied.  If the implicit block is provided,
429    # it is executed with each response as the argument and no result is
430    # returned.
431    #
432    # @param req [Object] the request sent to the server
433    # @param metadata [Hash] metadata to be sent to the server. If a value is
434    # a list, multiple metadata for its key are sent
435    # @return [Enumerator|nil] a response Enumerator
436    def server_streamer(req, metadata: {})
437      raise_error_if_already_executed
438      ops = {
439        SEND_MESSAGE => @marshal.call(req),
440        SEND_CLOSE_FROM_CLIENT => nil
441      }
442      @send_initial_md_mutex.synchronize do
443        # Metadata might have already been sent if this is an operation view
444        unless @metadata_sent
445          ops[SEND_INITIAL_METADATA] = @metadata_to_send.merge!(metadata)
446        end
447        @metadata_sent = true
448      end
449
450      begin
451        @call.run_batch(ops)
452      rescue GRPC::Core::CallError => e
453        receive_and_check_status # checks for Cancelled
454        raise e
455      rescue => e
456        set_input_stream_done
457        raise e
458      ensure
459        set_output_stream_done
460      end
461
462      replies = enum_for(:each_remote_read_then_finish)
463      return replies unless block_given?
464      replies.each { |r| yield r }
465    end
466
467    # bidi_streamer sends a stream of requests to the GRPC server, and yields
468    # a stream of responses.
469    #
470    # This method takes an Enumerable of requests, and returns and enumerable
471    # of responses.
472    #
473    # == requests ==
474    #
475    # requests provides an 'iterable' of Requests. I.e. it follows Ruby's
476    # #each enumeration protocol. In the simplest case, requests will be an
477    # array of marshallable objects; in typical case it will be an
478    # Enumerable that allows dynamic construction of the marshallable
479    # objects.
480    #
481    # == responses ==
482    #
483    # This is an enumerator of responses.  I.e, its #next method blocks
484    # waiting for the next response.  Also, if at any point the block needs
485    # to consume all the remaining responses, this can be done using #each or
486    # #collect.  Calling #each or #collect should only be done if
487    # the_call#writes_done has been called, otherwise the block will loop
488    # forever.
489    #
490    # @param requests [Object] an Enumerable of requests to send
491    # @param metadata [Hash] metadata to be sent to the server. If a value is
492    # a list, multiple metadata for its key are sent
493    # @return [Enumerator, nil] a response Enumerator
494    def bidi_streamer(requests, metadata: {}, &blk)
495      raise_error_if_already_executed
496      # Metadata might have already been sent if this is an operation view
497      begin
498        send_initial_metadata(metadata)
499      rescue GRPC::Core::CallError => e
500        batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
501        set_input_stream_done
502        set_output_stream_done
503        attach_status_results_and_complete_call(batch_result)
504        raise e
505      rescue => e
506        set_input_stream_done
507        set_output_stream_done
508        raise e
509      end
510
511      bd = BidiCall.new(@call,
512                        @marshal,
513                        @unmarshal,
514                        metadata_received: @metadata_received)
515
516      bd.run_on_client(requests,
517                       proc { set_input_stream_done },
518                       proc { set_output_stream_done },
519                       &blk)
520    end
521
522    # run_server_bidi orchestrates a BiDi stream processing on a server.
523    #
524    # N.B. gen_each_reply is a func(Enumerable<Requests>)
525    #
526    # It takes an enumerable of requests as an arg, in case there is a
527    # relationship between the stream of requests and the stream of replies.
528    #
529    # This does not mean that must necessarily be one.  E.g, the replies
530    # produced by gen_each_reply could ignore the received_msgs
531    #
532    # @param mth [Proc] generates the BiDi stream replies
533    # @param interception_ctx [InterceptionContext]
534    #
535    def run_server_bidi(mth, interception_ctx)
536      view = multi_req_view
537      bidi_call = BidiCall.new(
538        @call,
539        @marshal,
540        @unmarshal,
541        metadata_received: @metadata_received,
542        req_view: view
543      )
544      requests = bidi_call.read_next_loop(proc { set_input_stream_done }, false)
545      interception_ctx.intercept!(
546        :bidi_streamer,
547        call: view,
548        method: mth,
549        requests: requests
550      ) do
551        bidi_call.run_on_server(mth, requests)
552      end
553    end
554
555    # Waits till an operation completes
556    def wait
557      return if @op_notifier.nil?
558      GRPC.logger.debug("active_call.wait: on #{@op_notifier}")
559      @op_notifier.wait
560    end
561
562    # Signals that an operation is done.
563    # Only relevant on the client-side (this is a no-op on the server-side)
564    def op_is_done
565      return if @op_notifier.nil?
566      @op_notifier.notify(self)
567    end
568
569    # Add to the metadata that will be sent from the server.
570    # Fails if metadata has already been sent.
571    # Unused by client calls.
572    def merge_metadata_to_send(new_metadata = {})
573      @send_initial_md_mutex.synchronize do
574        fail('cant change metadata after already sent') if @metadata_sent
575        @metadata_to_send.merge!(new_metadata)
576      end
577    end
578
579    def attach_peer_cert(peer_cert)
580      @peer_cert = peer_cert
581    end
582
583    private
584
585    # To be called once the "input stream" has been completelly
586    # read through (i.e, done reading from client or received status)
587    # note this is idempotent
588    def set_input_stream_done
589      @call_finished_mu.synchronize do
590        @input_stream_done = true
591        maybe_finish_and_close_call_locked
592      end
593    end
594
595    # To be called once the "output stream" has been completelly
596    # sent through (i.e, done sending from client or sent status)
597    # note this is idempotent
598    def set_output_stream_done
599      @call_finished_mu.synchronize do
600        @output_stream_done = true
601        maybe_finish_and_close_call_locked
602      end
603    end
604
605    def maybe_finish_and_close_call_locked
606      return unless @output_stream_done && @input_stream_done
607      return if @call_finished
608      @call_finished = true
609      op_is_done
610      @call.close
611    end
612
613    # Starts the call if not already started
614    # @param metadata [Hash] metadata to be sent to the server. If a value is
615    # a list, multiple metadata for its key are sent
616    def start_call(metadata = {})
617      merge_metadata_to_send(metadata) && send_initial_metadata
618    end
619
620    def raise_error_if_already_executed
621      @client_call_executed_mu.synchronize do
622        if @client_call_executed
623          fail GRPC::Core::CallError, 'attempting to re-run a call'
624        end
625        @client_call_executed = true
626      end
627    end
628
629    def self.view_class(*visible_methods)
630      Class.new do
631        extend ::Forwardable
632        def_delegators :@wrapped, *visible_methods
633
634        # @param wrapped [ActiveCall] the call whose methods are shielded
635        def initialize(wrapped)
636          @wrapped = wrapped
637        end
638      end
639    end
640
641    # SingleReqView limits access to an ActiveCall's methods for use in server
642    # handlers that receive just one request.
643    SingleReqView = view_class(:cancelled?, :deadline, :metadata,
644                               :output_metadata, :peer, :peer_cert,
645                               :send_initial_metadata,
646                               :metadata_to_send,
647                               :merge_metadata_to_send,
648                               :metadata_sent)
649
650    # MultiReqView limits access to an ActiveCall's methods for use in
651    # server client_streamer handlers.
652    MultiReqView = view_class(:cancelled?, :deadline,
653                              :each_remote_read, :metadata, :output_metadata,
654                              :peer, :peer_cert,
655                              :send_initial_metadata,
656                              :metadata_to_send,
657                              :merge_metadata_to_send,
658                              :metadata_sent)
659
660    # Operation limits access to an ActiveCall's methods for use as
661    # a Operation on the client.
662    Operation = view_class(:cancel, :cancelled?, :deadline, :execute,
663                           :metadata, :status, :start_call, :wait, :write_flag,
664                           :write_flag=, :trailing_metadata)
665
666    # InterceptableView further limits access to an ActiveCall's methods
667    # for use in interceptors on the client, exposing only the deadline
668    InterceptableView = view_class(:deadline)
669  end
670end
671