1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14require 'spec_helper'
15
16def load_test_certs
17  test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata')
18  files = ['ca.pem', 'server1.key', 'server1.pem']
19  files.map { |f| File.open(File.join(test_root, f)).read }
20end
21
22def check_md(wanted_md, received_md)
23  wanted_md.zip(received_md).each do |w, r|
24    w.each do |key, value|
25      expect(r[key]).to eq(value)
26    end
27  end
28end
29
30# A test service with no methods.
31class EmptyService
32  include GRPC::GenericService
33end
34
35# A test service without an implementation.
36class NoRpcImplementation
37  include GRPC::GenericService
38  rpc :an_rpc, EchoMsg, EchoMsg
39end
40
41# A test service with an implementation that fails with BadStatus
42class FailingService
43  include GRPC::GenericService
44  rpc :an_rpc, EchoMsg, EchoMsg
45  attr_reader :details, :code, :md
46
47  def initialize(_default_var = 'ignored')
48    @details = 'app error'
49    @code = 101
50    @md = { 'failed_method' => 'an_rpc' }
51  end
52
53  def an_rpc(_req, _call)
54    fail GRPC::BadStatus.new(@code, @details, @md)
55  end
56end
57
58FailingStub = FailingService.rpc_stub_class
59
60# A slow test service.
61class SlowService
62  include GRPC::GenericService
63  rpc :an_rpc, EchoMsg, EchoMsg
64  attr_reader :received_md, :delay
65
66  def initialize(_default_var = 'ignored')
67    @delay = 0.25
68    @received_md = []
69  end
70
71  def an_rpc(req, call)
72    GRPC.logger.info("starting a slow #{@delay} rpc")
73    sleep @delay
74    @received_md << call.metadata unless call.metadata.nil?
75    req  # send back the req as the response
76  end
77end
78
79SlowStub = SlowService.rpc_stub_class
80
81# A test service that allows a synchronized RPC cancellation
82class SynchronizedCancellationService
83  include GRPC::GenericService
84  rpc :an_rpc, EchoMsg, EchoMsg
85  attr_reader :received_md, :delay
86
87  # notify_request_received and wait_until_rpc_cancelled are
88  # callbacks to synchronously allow the client to proceed with
89  # cancellation (after the unary request has been received),
90  # and to synchronously wait until the client has cancelled the
91  # current RPC.
92  def initialize(notify_request_received, wait_until_rpc_cancelled)
93    @notify_request_received = notify_request_received
94    @wait_until_rpc_cancelled = wait_until_rpc_cancelled
95  end
96
97  def an_rpc(req, _call)
98    GRPC.logger.info('starting a synchronusly cancelled rpc')
99    @notify_request_received.call(req)
100    @wait_until_rpc_cancelled.call
101    req  # send back the req as the response
102  end
103end
104
105SynchronizedCancellationStub = SynchronizedCancellationService.rpc_stub_class
106
107# a test service that hangs onto call objects
108# and uses them after the server-side call has been
109# finished
110class CheckCallAfterFinishedService
111  include GRPC::GenericService
112  rpc :an_rpc, EchoMsg, EchoMsg
113  rpc :a_client_streaming_rpc, stream(EchoMsg), EchoMsg
114  rpc :a_server_streaming_rpc, EchoMsg, stream(EchoMsg)
115  rpc :a_bidi_rpc, stream(EchoMsg), stream(EchoMsg)
116  attr_reader :server_side_call
117
118  def an_rpc(req, call)
119    fail 'shouldnt reuse service' unless @server_side_call.nil?
120    @server_side_call = call
121    req
122  end
123
124  def a_client_streaming_rpc(call)
125    fail 'shouldnt reuse service' unless @server_side_call.nil?
126    @server_side_call = call
127    # iterate through requests so call can complete
128    call.each_remote_read.each { |r| GRPC.logger.info(r) }
129    EchoMsg.new
130  end
131
132  def a_server_streaming_rpc(_, call)
133    fail 'shouldnt reuse service' unless @server_side_call.nil?
134    @server_side_call = call
135    [EchoMsg.new, EchoMsg.new]
136  end
137
138  def a_bidi_rpc(requests, call)
139    fail 'shouldnt reuse service' unless @server_side_call.nil?
140    @server_side_call = call
141    requests.each { |r| GRPC.logger.info(r) }
142    [EchoMsg.new, EchoMsg.new]
143  end
144end
145
146CheckCallAfterFinishedServiceStub = CheckCallAfterFinishedService.rpc_stub_class
147
148# A service with a bidi streaming method.
149class BidiService
150  include GRPC::GenericService
151  rpc :server_sends_bad_input, stream(EchoMsg), stream(EchoMsg)
152
153  def server_sends_bad_input(_, _)
154    'bad response. (not an enumerable, client sees an error)'
155  end
156end
157
158BidiStub = BidiService.rpc_stub_class
159
160describe GRPC::RpcServer do
161  RpcServer = GRPC::RpcServer
162  StatusCodes = GRPC::Core::StatusCodes
163
164  before(:each) do
165    @method = 'an_rpc_method'
166    @pass = 0
167    @fail = 1
168    @noop = proc { |x| x }
169  end
170
171  describe '#new' do
172    it 'can be created with just some args' do
173      opts = { server_args: { a_channel_arg: 'an_arg' } }
174      blk = proc do
175        new_rpc_server_for_testing(**opts)
176      end
177      expect(&blk).not_to raise_error
178    end
179
180    it 'cannot be created with invalid ServerCredentials' do
181      blk = proc do
182        opts = {
183          server_args: { a_channel_arg: 'an_arg' },
184          creds: Object.new
185        }
186        new_rpc_server_for_testing(**opts)
187      end
188      expect(&blk).to raise_error
189    end
190  end
191
192  describe '#stopped?' do
193    before(:each) do
194      opts = { server_args: { a_channel_arg: 'an_arg' }, poll_period: 1.5 }
195      @srv = new_rpc_server_for_testing(**opts)
196      @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
197    end
198
199    it 'starts out false' do
200      expect(@srv.stopped?).to be(false)
201    end
202
203    it 'stays false after the server starts running', server: true do
204      @srv.handle(EchoService)
205      t = Thread.new { @srv.run }
206      @srv.wait_till_running
207      expect(@srv.stopped?).to be(false)
208      @srv.stop
209      t.join
210    end
211
212    it 'is true after a running server is stopped', server: true do
213      @srv.handle(EchoService)
214      t = Thread.new { @srv.run }
215      @srv.wait_till_running
216      @srv.stop
217      t.join
218      expect(@srv.stopped?).to be(true)
219    end
220  end
221
222  describe '#running?' do
223    it 'starts out false' do
224      opts = {
225        server_args: { a_channel_arg: 'an_arg' }
226      }
227      r = new_rpc_server_for_testing(**opts)
228      expect(r.running?).to be(false)
229    end
230
231    it 'is false if run is called with no services registered', server: true do
232      opts = {
233        server_args: { a_channel_arg: 'an_arg' },
234        poll_period: 2
235      }
236      r = new_rpc_server_for_testing(**opts)
237      r.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
238      expect { r.run }.to raise_error(RuntimeError)
239    end
240
241    it 'is true after run is called with a registered service' do
242      opts = {
243        server_args: { a_channel_arg: 'an_arg' },
244        poll_period: 2.5
245      }
246      r = new_rpc_server_for_testing(**opts)
247      r.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
248      r.handle(EchoService)
249      t = Thread.new { r.run }
250      r.wait_till_running
251      expect(r.running?).to be(true)
252      r.stop
253      t.join
254    end
255  end
256
257  describe '#handle' do
258    before(:each) do
259      @opts = { server_args: { a_channel_arg: 'an_arg' }, poll_period: 1 }
260      @srv = new_rpc_server_for_testing(**@opts)
261      @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
262    end
263
264    it 'raises if #run has already been called' do
265      @srv.handle(EchoService)
266      t = Thread.new { @srv.run }
267      @srv.wait_till_running
268      expect { @srv.handle(EchoService) }.to raise_error
269      @srv.stop
270      t.join
271    end
272
273    it 'raises if the server has been run and stopped' do
274      @srv.handle(EchoService)
275      t = Thread.new { @srv.run }
276      @srv.wait_till_running
277      @srv.stop
278      t.join
279      expect { @srv.handle(EchoService) }.to raise_error
280    end
281
282    it 'raises if the service does not include GenericService ' do
283      expect { @srv.handle(Object) }.to raise_error
284    end
285
286    it 'raises if the service does not declare any rpc methods' do
287      expect { @srv.handle(EmptyService) }.to raise_error
288    end
289
290    it 'raises if a handler method is already registered' do
291      @srv.handle(EchoService)
292      expect { r.handle(EchoService) }.to raise_error
293    end
294  end
295
296  describe '#run' do
297    let(:client_opts) { { channel_override: @ch } }
298    let(:marshal) { EchoService.rpc_descs[:an_rpc].marshal_proc }
299    let(:unmarshal) { EchoService.rpc_descs[:an_rpc].unmarshal_proc(:output) }
300
301    context 'with no connect_metadata' do
302      before(:each) do
303        server_opts = {
304          poll_period: 1
305        }
306        @srv = new_rpc_server_for_testing(**server_opts)
307        server_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
308        @host = "localhost:#{server_port}"
309        @ch = GRPC::Core::Channel.new(@host, nil, :this_channel_is_insecure)
310      end
311
312      it 'should return NOT_FOUND status on unknown methods', server: true do
313        @srv.handle(EchoService)
314        t = Thread.new { @srv.run }
315        @srv.wait_till_running
316        req = EchoMsg.new
317        blk = proc do
318          stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure,
319                                      **client_opts)
320          stub.request_response('/unknown', req, marshal, unmarshal)
321        end
322        expect(&blk).to raise_error GRPC::BadStatus
323        @srv.stop
324        t.join
325      end
326
327      it 'should return UNIMPLEMENTED on unimplemented methods', server: true do
328        @srv.handle(NoRpcImplementation)
329        t = Thread.new { @srv.run }
330        @srv.wait_till_running
331        req = EchoMsg.new
332        blk = proc do
333          stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure,
334                                      **client_opts)
335          stub.request_response('/an_rpc', req, marshal, unmarshal)
336        end
337        expect(&blk).to raise_error do |error|
338          expect(error).to be_a(GRPC::BadStatus)
339          expect(error.code).to be(GRPC::Core::StatusCodes::UNIMPLEMENTED)
340        end
341        @srv.stop
342        t.join
343      end
344
345      it 'should handle multiple sequential requests', server: true do
346        @srv.handle(EchoService)
347        t = Thread.new { @srv.run }
348        @srv.wait_till_running
349        req = EchoMsg.new
350        n = 5  # arbitrary
351        stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts)
352        n.times { expect(stub.an_rpc(req)).to be_a(EchoMsg) }
353        @srv.stop
354        t.join
355      end
356
357      it 'should receive metadata sent as rpc keyword args', server: true do
358        service = EchoService.new
359        @srv.handle(service)
360        t = Thread.new { @srv.run }
361        @srv.wait_till_running
362        req = EchoMsg.new
363        stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts)
364        expect(stub.an_rpc(req, metadata: { k1: 'v1', k2: 'v2' }))
365          .to be_a(EchoMsg)
366        wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }]
367        check_md(wanted_md, service.received_md)
368        @srv.stop
369        t.join
370      end
371
372      it 'should receive metadata if a deadline is specified', server: true do
373        service = SlowService.new
374        @srv.handle(service)
375        t = Thread.new { @srv.run }
376        @srv.wait_till_running
377        req = EchoMsg.new
378        stub = SlowStub.new(@host, :this_channel_is_insecure, **client_opts)
379        timeout = service.delay + 1.0
380        deadline = GRPC::Core::TimeConsts.from_relative_time(timeout)
381        resp = stub.an_rpc(req,
382                           deadline: deadline,
383                           metadata: { k1: 'v1', k2: 'v2' })
384        expect(resp).to be_a(EchoMsg)
385        wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }]
386        check_md(wanted_md, service.received_md)
387        @srv.stop
388        t.join
389      end
390
391      it 'should handle cancellation correctly', server: true do
392        request_received = false
393        request_received_mu = Mutex.new
394        request_received_cv = ConditionVariable.new
395        notify_request_received = proc do |req|
396          request_received_mu.synchronize do
397            fail 'req is nil' if req.nil?
398            expect(req.is_a?(EchoMsg)).to be true
399            fail 'test bug - already set' if request_received
400            request_received = true
401            request_received_cv.signal
402          end
403        end
404
405        rpc_cancelled = false
406        rpc_cancelled_mu = Mutex.new
407        rpc_cancelled_cv = ConditionVariable.new
408        wait_until_rpc_cancelled = proc do
409          rpc_cancelled_mu.synchronize do
410            loop do
411              break if rpc_cancelled
412              rpc_cancelled_cv.wait(rpc_cancelled_mu)
413            end
414          end
415        end
416
417        service = SynchronizedCancellationService.new(notify_request_received,
418                                                      wait_until_rpc_cancelled)
419        @srv.handle(service)
420        srv_thd = Thread.new { @srv.run }
421        @srv.wait_till_running
422        req = EchoMsg.new
423        stub = SynchronizedCancellationStub.new(@host,
424                                                :this_channel_is_insecure,
425                                                **client_opts)
426        op = stub.an_rpc(req, return_op: true)
427
428        client_thd = Thread.new do
429          expect { op.execute }.to raise_error GRPC::Cancelled
430        end
431
432        request_received_mu.synchronize do
433          loop do
434            break if request_received
435            request_received_cv.wait(request_received_mu)
436          end
437        end
438
439        op.cancel
440
441        rpc_cancelled_mu.synchronize do
442          fail 'test bug - already set' if rpc_cancelled
443          rpc_cancelled = true
444          rpc_cancelled_cv.signal
445        end
446
447        client_thd.join
448        @srv.stop
449        srv_thd.join
450      end
451
452      it 'should handle multiple parallel requests', server: true do
453        @srv.handle(EchoService)
454        t = Thread.new { @srv.run }
455        @srv.wait_till_running
456        req, q = EchoMsg.new, Queue.new
457        n = 5  # arbitrary
458        threads = [t]
459        n.times do
460          threads << Thread.new do
461            stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts)
462            q << stub.an_rpc(req)
463          end
464        end
465        n.times { expect(q.pop).to be_a(EchoMsg) }
466        @srv.stop
467        threads.each(&:join)
468      end
469
470      it 'should return RESOURCE_EXHAUSTED on too many jobs', server: true do
471        opts = {
472          server_args: { a_channel_arg: 'an_arg' },
473          pool_size: 2,
474          poll_period: 1,
475          max_waiting_requests: 1
476        }
477        alt_srv = new_rpc_server_for_testing(**opts)
478        alt_srv.handle(SlowService)
479        alt_port = alt_srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
480        alt_host = "0.0.0.0:#{alt_port}"
481        t = Thread.new { alt_srv.run }
482        alt_srv.wait_till_running
483        req = EchoMsg.new
484        n = 20 # arbitrary, use as many to ensure the server pool is exceeded
485        threads = []
486        one_failed_as_unavailable = false
487        n.times do
488          threads << Thread.new do
489            stub = SlowStub.new(alt_host, :this_channel_is_insecure)
490            begin
491              stub.an_rpc(req)
492            rescue GRPC::ResourceExhausted
493              one_failed_as_unavailable = true
494            end
495          end
496        end
497        threads.each(&:join)
498        alt_srv.stop
499        t.join
500        expect(one_failed_as_unavailable).to be(true)
501      end
502
503      it 'should send a status UNKNOWN with a relevant message when the' \
504        'servers response stream is not an enumerable' do
505        @srv.handle(BidiService)
506        t = Thread.new { @srv.run }
507        @srv.wait_till_running
508        stub = BidiStub.new(@host, :this_channel_is_insecure, **client_opts)
509        responses = stub.server_sends_bad_input([])
510        exception = nil
511        begin
512          responses.each { |r| r }
513        rescue GRPC::Unknown => e
514          exception = e
515        end
516        # Erroneous responses sent from the server handler should cause an
517        # exception on the client with relevant info.
518        expected_details = 'NoMethodError: undefined method `each\' for '\
519          '"bad response. (not an enumerable, client sees an error)"'
520
521        expect(exception.inspect.include?(expected_details)).to be true
522        @srv.stop
523        t.join
524      end
525    end
526
527    context 'with connect metadata' do
528      let(:test_md_proc) do
529        proc do |mth, md|
530          res = md.clone
531          res['method'] = mth
532          res['connect_k1'] = 'connect_v1'
533          res
534        end
535      end
536      before(:each) do
537        server_opts = {
538          poll_period: 1,
539          connect_md_proc: test_md_proc
540        }
541        @srv = new_rpc_server_for_testing(**server_opts)
542        alt_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
543        @alt_host = "0.0.0.0:#{alt_port}"
544      end
545
546      it 'should send connect metadata to the client', server: true do
547        service = EchoService.new
548        @srv.handle(service)
549        t = Thread.new { @srv.run }
550        @srv.wait_till_running
551        req = EchoMsg.new
552        stub = EchoStub.new(@alt_host, :this_channel_is_insecure)
553        op = stub.an_rpc(req, metadata: { k1: 'v1', k2: 'v2' }, return_op: true)
554        expect(op.metadata).to be nil
555        expect(op.execute).to be_a(EchoMsg)
556        wanted_md = {
557          'k1' => 'v1',
558          'k2' => 'v2',
559          'method' => '/EchoService/an_rpc',
560          'connect_k1' => 'connect_v1'
561        }
562        wanted_md.each do |key, value|
563          GRPC.logger.info("key: #{key}")
564          expect(op.metadata[key]).to eq(value)
565        end
566        @srv.stop
567        t.join
568      end
569    end
570
571    context 'with trailing metadata' do
572      before(:each) do
573        server_opts = {
574          poll_period: 1
575        }
576        @srv = new_rpc_server_for_testing(**server_opts)
577        alt_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
578        @alt_host = "0.0.0.0:#{alt_port}"
579      end
580
581      it 'should be added to BadStatus when requests fail', server: true do
582        service = FailingService.new
583        @srv.handle(service)
584        t = Thread.new { @srv.run }
585        @srv.wait_till_running
586        req = EchoMsg.new
587        stub = FailingStub.new(@alt_host, :this_channel_is_insecure)
588        blk = proc { stub.an_rpc(req) }
589
590        # confirm it raise the expected error
591        expect(&blk).to raise_error GRPC::BadStatus
592
593        # call again and confirm exception contained the trailing metadata.
594        begin
595          blk.call
596        rescue GRPC::BadStatus => e
597          expect(e.code).to eq(service.code)
598          expect(e.details).to eq(service.details)
599          expect(e.metadata).to eq(service.md)
600        end
601        @srv.stop
602        t.join
603      end
604
605      it 'should be received by the client', server: true do
606        wanted_trailers = { 'k1' => 'out_v1', 'k2' => 'out_v2' }
607        service = EchoService.new(k1: 'out_v1', k2: 'out_v2')
608        @srv.handle(service)
609        t = Thread.new { @srv.run }
610        @srv.wait_till_running
611        req = EchoMsg.new
612        stub = EchoStub.new(@alt_host, :this_channel_is_insecure)
613        op = stub.an_rpc(req, return_op: true, metadata: { k1: 'v1', k2: 'v2' })
614        expect(op.metadata).to be nil
615        expect(op.execute).to be_a(EchoMsg)
616        expect(op.trailing_metadata).to eq(wanted_trailers)
617        @srv.stop
618        t.join
619      end
620    end
621
622    context 'when call objects are used after calls have completed' do
623      before(:each) do
624        server_opts = {
625          poll_period: 1
626        }
627        @srv = new_rpc_server_for_testing(**server_opts)
628        alt_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
629        @alt_host = "0.0.0.0:#{alt_port}"
630
631        @service = CheckCallAfterFinishedService.new
632        @srv.handle(@service)
633        @srv_thd  = Thread.new { @srv.run }
634        @srv.wait_till_running
635      end
636
637      # check that the server-side call is still in a usable state even
638      # after it has finished
639      def check_single_req_view_of_finished_call(call)
640        common_check_of_finished_server_call(call)
641
642        expect(call.peer).to be_a(String)
643        expect(call.peer_cert).to be(nil)
644      end
645
646      def check_multi_req_view_of_finished_call(call)
647        common_check_of_finished_server_call(call)
648
649        expect do
650          call.each_remote_read.each { |r| p r }
651        end.to raise_error(GRPC::Core::CallError)
652      end
653
654      def common_check_of_finished_server_call(call)
655        expect do
656          call.merge_metadata_to_send({})
657        end.to raise_error(RuntimeError)
658
659        expect do
660          call.send_initial_metadata
661        end.to_not raise_error
662
663        expect(call.cancelled?).to be(false)
664        expect(call.metadata).to be_a(Hash)
665        expect(call.metadata['user-agent']).to be_a(String)
666
667        expect(call.metadata_sent).to be(true)
668        expect(call.output_metadata).to eq({})
669        expect(call.metadata_to_send).to eq({})
670        expect(call.deadline.is_a?(Time)).to be(true)
671      end
672
673      it 'should not crash when call used after an unary call is finished' do
674        req = EchoMsg.new
675        stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
676                                                     :this_channel_is_insecure)
677        resp = stub.an_rpc(req)
678        expect(resp).to be_a(EchoMsg)
679        @srv.stop
680        @srv_thd.join
681
682        check_single_req_view_of_finished_call(@service.server_side_call)
683      end
684
685      it 'should not crash when call used after client streaming finished' do
686        requests = [EchoMsg.new, EchoMsg.new]
687        stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
688                                                     :this_channel_is_insecure)
689        resp = stub.a_client_streaming_rpc(requests)
690        expect(resp).to be_a(EchoMsg)
691        @srv.stop
692        @srv_thd.join
693
694        check_multi_req_view_of_finished_call(@service.server_side_call)
695      end
696
697      it 'should not crash when call used after server streaming finished' do
698        req = EchoMsg.new
699        stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
700                                                     :this_channel_is_insecure)
701        responses = stub.a_server_streaming_rpc(req)
702        responses.each do |r|
703          expect(r).to be_a(EchoMsg)
704        end
705        @srv.stop
706        @srv_thd.join
707
708        check_single_req_view_of_finished_call(@service.server_side_call)
709      end
710
711      it 'should not crash when call used after a bidi call is finished' do
712        requests = [EchoMsg.new, EchoMsg.new]
713        stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
714                                                     :this_channel_is_insecure)
715        responses = stub.a_bidi_rpc(requests)
716        responses.each do |r|
717          expect(r).to be_a(EchoMsg)
718        end
719        @srv.stop
720        @srv_thd.join
721
722        check_multi_req_view_of_finished_call(@service.server_side_call)
723      end
724    end
725  end
726end
727