1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15require 'spec_helper'
16
17include GRPC::Core::StatusCodes
18
19describe GRPC::ActiveCall do
20  ActiveCall = GRPC::ActiveCall
21  Call = GRPC::Core::Call
22  CallOps = GRPC::Core::CallOps
23  WriteFlags = GRPC::Core::WriteFlags
24
25  def ok_status
26    Struct::Status.new(OK, 'OK')
27  end
28
29  def send_and_receive_close_and_status(client_call, server_call)
30    client_call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
31    server_call.run_batch(CallOps::RECV_CLOSE_ON_SERVER => nil,
32                          CallOps::SEND_STATUS_FROM_SERVER => ok_status)
33    client_call.run_batch(CallOps::RECV_STATUS_ON_CLIENT => nil)
34  end
35
36  def inner_call_of_active_call(active_call)
37    active_call.instance_variable_get(:@call)
38  end
39
40  before(:each) do
41    @pass_through = proc { |x| x }
42    host = '0.0.0.0:0'
43    @server = new_core_server_for_testing(nil)
44    server_port = @server.add_http2_port(host, :this_port_is_insecure)
45    @server.start
46    @ch = GRPC::Core::Channel.new("0.0.0.0:#{server_port}", nil,
47                                  :this_channel_is_insecure)
48  end
49
50  after(:each) do
51    @server.shutdown_and_notify(deadline)
52    @server.close
53  end
54
55  describe 'restricted view methods' do
56    before(:each) do
57      call = make_test_call
58      ActiveCall.client_invoke(call)
59      @client_call = ActiveCall.new(call, @pass_through,
60                                    @pass_through, deadline)
61    end
62
63    describe '#multi_req_view' do
64      it 'exposes a fixed subset of the ActiveCall.methods' do
65        want = %w(cancelled?, deadline, each_remote_read, metadata, \
66                  shutdown, peer, peer_cert, send_initial_metadata, \
67                  initial_metadata_sent)
68        v = @client_call.multi_req_view
69        want.each do |w|
70          expect(v.methods.include?(w))
71        end
72      end
73    end
74
75    describe '#single_req_view' do
76      it 'exposes a fixed subset of the ActiveCall.methods' do
77        want = %w(cancelled?, deadline, metadata, shutdown, \
78                  send_initial_metadata, metadata_to_send, \
79                  merge_metadata_to_send, initial_metadata_sent)
80        v = @client_call.single_req_view
81        want.each do |w|
82          expect(v.methods.include?(w))
83        end
84      end
85    end
86
87    describe '#interceptable' do
88      it 'exposes a fixed subset of the ActiveCall.methods' do
89        want = %w(deadline)
90        v = @client_call.interceptable
91        want.each do |w|
92          expect(v.methods.include?(w))
93        end
94      end
95    end
96  end
97
98  describe '#remote_send' do
99    it 'allows a client to send a payload to the server', test: true do
100      call = make_test_call
101      ActiveCall.client_invoke(call)
102      client_call = ActiveCall.new(call, @pass_through,
103                                   @pass_through, deadline)
104      msg = 'message is a string'
105      client_call.remote_send(msg)
106
107      # check that server rpc new was received
108      recvd_rpc = @server.request_call
109      expect(recvd_rpc).to_not eq nil
110      recvd_call = recvd_rpc.call
111
112      # Accept the call, and verify that the server reads the response ok.
113      server_call = ActiveCall.new(recvd_call, @pass_through,
114                                   @pass_through, deadline,
115                                   metadata_received: true,
116                                   started: false)
117      expect(server_call.remote_read).to eq(msg)
118      # finish the call
119      server_call.send_initial_metadata
120      call.run_batch(CallOps::RECV_INITIAL_METADATA => nil)
121      send_and_receive_close_and_status(call, recvd_call)
122    end
123
124    it 'marshals the payload using the marshal func' do
125      call = make_test_call
126      ActiveCall.client_invoke(call)
127      marshal = proc { |x| 'marshalled:' + x }
128      client_call = ActiveCall.new(call, marshal, @pass_through, deadline)
129      msg = 'message is a string'
130      client_call.remote_send(msg)
131
132      # confirm that the message was marshalled
133      recvd_rpc =  @server.request_call
134      recvd_call = recvd_rpc.call
135      server_ops = {
136        CallOps::SEND_INITIAL_METADATA => nil
137      }
138      recvd_call.run_batch(server_ops)
139      server_call = ActiveCall.new(recvd_call, @pass_through,
140                                   @pass_through, deadline,
141                                   metadata_received: true)
142      expect(server_call.remote_read).to eq('marshalled:' + msg)
143      # finish the call
144      call.run_batch(CallOps::RECV_INITIAL_METADATA => nil)
145      send_and_receive_close_and_status(call, recvd_call)
146    end
147
148    TEST_WRITE_FLAGS = [WriteFlags::BUFFER_HINT, WriteFlags::NO_COMPRESS]
149    TEST_WRITE_FLAGS.each do |f|
150      it "successfully makes calls with write_flag set to #{f}" do
151        call = make_test_call
152        ActiveCall.client_invoke(call)
153        marshal = proc { |x| 'marshalled:' + x }
154        client_call = ActiveCall.new(call, marshal,
155                                     @pass_through, deadline)
156        msg = 'message is a string'
157        client_call.write_flag = f
158        client_call.remote_send(msg)
159        # flush the message in case writes are set to buffered
160        call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) if f == 1
161
162        # confirm that the message was marshalled
163        recvd_rpc =  @server.request_call
164        recvd_call = recvd_rpc.call
165        server_ops = {
166          CallOps::SEND_INITIAL_METADATA => nil
167        }
168        recvd_call.run_batch(server_ops)
169        server_call = ActiveCall.new(recvd_call, @pass_through,
170                                     @pass_through, deadline,
171                                     metadata_received: true)
172        expect(server_call.remote_read).to eq('marshalled:' + msg)
173        # finish the call
174        server_call.send_status(OK, '', true)
175        client_call.receive_and_check_status
176      end
177    end
178  end
179
180  describe 'sending initial metadata', send_initial_metadata: true do
181    it 'sends metadata before sending a message if it hasnt been sent yet' do
182      call = make_test_call
183      @client_call = ActiveCall.new(
184        call,
185        @pass_through,
186        @pass_through,
187        deadline,
188        started: false)
189
190      metadata = { key: 'dummy_val', other: 'other_val' }
191      expect(@client_call.metadata_sent).to eq(false)
192      @client_call.merge_metadata_to_send(metadata)
193
194      message = 'dummy message'
195
196      expect(call).to(
197        receive(:run_batch)
198          .with(
199            hash_including(
200              CallOps::SEND_INITIAL_METADATA => metadata)).once)
201
202      expect(call).to(
203        receive(:run_batch).with(hash_including(
204                                   CallOps::SEND_MESSAGE => message)).once)
205      @client_call.remote_send(message)
206
207      expect(@client_call.metadata_sent).to eq(true)
208    end
209
210    it 'doesnt send metadata if it thinks its already been sent' do
211      call = make_test_call
212
213      @client_call = ActiveCall.new(call,
214                                    @pass_through,
215                                    @pass_through,
216                                    deadline)
217      expect(@client_call.metadata_sent).to eql(true)
218      expect(call).to(
219        receive(:run_batch).with(hash_including(
220                                   CallOps::SEND_INITIAL_METADATA)).never)
221
222      @client_call.remote_send('test message')
223    end
224
225    it 'sends metadata if it is explicitly sent and ok to do so' do
226      call = make_test_call
227
228      @client_call = ActiveCall.new(call,
229                                    @pass_through,
230                                    @pass_through,
231                                    deadline,
232                                    started: false)
233
234      expect(@client_call.metadata_sent).to eql(false)
235
236      metadata = { test_key: 'val' }
237      @client_call.merge_metadata_to_send(metadata)
238      expect(@client_call.metadata_to_send).to eq(metadata)
239
240      expect(call).to(
241        receive(:run_batch).with(hash_including(
242                                   CallOps::SEND_INITIAL_METADATA =>
243                                     metadata)).once)
244      @client_call.send_initial_metadata
245    end
246
247    it 'explicit sending does nothing if metadata has already been sent' do
248      call = make_test_call
249
250      @client_call = ActiveCall.new(call,
251                                    @pass_through,
252                                    @pass_through,
253                                    deadline)
254
255      expect(@client_call.metadata_sent).to eql(true)
256
257      blk = proc do
258        @client_call.send_initial_metadata
259      end
260
261      expect { blk.call }.to_not raise_error
262    end
263  end
264
265  describe '#merge_metadata_to_send', merge_metadata_to_send: true do
266    it 'adds to existing metadata when there is existing metadata to send' do
267      call = make_test_call
268      starting_metadata = {
269        k1: 'key1_val',
270        k2: 'key2_val',
271        k3: 'key3_val'
272      }
273
274      @client_call = ActiveCall.new(
275        call,
276        @pass_through, @pass_through,
277        deadline,
278        started: false,
279        metadata_to_send: starting_metadata)
280
281      expect(@client_call.metadata_to_send).to eq(starting_metadata)
282
283      @client_call.merge_metadata_to_send(
284        k3: 'key3_new_val',
285        k4: 'key4_val')
286
287      expected_md_to_send = {
288        k1: 'key1_val',
289        k2: 'key2_val',
290        k3: 'key3_new_val',
291        k4: 'key4_val' }
292
293      expect(@client_call.metadata_to_send).to eq(expected_md_to_send)
294
295      @client_call.merge_metadata_to_send(k5: 'key5_val')
296      expected_md_to_send.merge!(k5: 'key5_val')
297      expect(@client_call.metadata_to_send).to eq(expected_md_to_send)
298    end
299
300    it 'fails when initial metadata has already been sent' do
301      call = make_test_call
302      @client_call = ActiveCall.new(
303        call,
304        @pass_through,
305        @pass_through,
306        deadline,
307        started: true)
308
309      expect(@client_call.metadata_sent).to eq(true)
310
311      blk = proc do
312        @client_call.merge_metadata_to_send(k1: 'key1_val')
313      end
314
315      expect { blk.call }.to raise_error
316    end
317  end
318
319  describe '#client_invoke' do
320    it 'sends metadata to the server when present' do
321      call = make_test_call
322      metadata = { k1: 'v1', k2: 'v2' }
323      ActiveCall.client_invoke(call, metadata)
324      recvd_rpc =  @server.request_call
325      recvd_call = recvd_rpc.call
326      expect(recvd_call).to_not be_nil
327      expect(recvd_rpc.metadata).to_not be_nil
328      expect(recvd_rpc.metadata['k1']).to eq('v1')
329      expect(recvd_rpc.metadata['k2']).to eq('v2')
330      # finish the call
331      recvd_call.run_batch(CallOps::SEND_INITIAL_METADATA => {})
332      call.run_batch(CallOps::RECV_INITIAL_METADATA => nil)
333      send_and_receive_close_and_status(call, recvd_call)
334    end
335  end
336
337  describe '#send_status', send_status: true do
338    it 'works when no metadata or messages have been sent yet' do
339      call = make_test_call
340      ActiveCall.client_invoke(call)
341
342      recvd_rpc = @server.request_call
343      server_call = ActiveCall.new(
344        recvd_rpc.call,
345        @pass_through,
346        @pass_through,
347        deadline,
348        started: false)
349
350      expect(server_call.metadata_sent).to eq(false)
351      blk = proc { server_call.send_status(OK) }
352      expect { blk.call }.to_not raise_error
353    end
354  end
355
356  describe '#remote_read', remote_read: true do
357    it 'reads the response sent by a server' do
358      call = make_test_call
359      ActiveCall.client_invoke(call)
360      client_call = ActiveCall.new(call, @pass_through,
361                                   @pass_through, deadline)
362      msg = 'message is a string'
363      client_call.remote_send(msg)
364      server_call = expect_server_to_receive(msg)
365      server_call.remote_send('server_response')
366      expect(client_call.remote_read).to eq('server_response')
367      send_and_receive_close_and_status(
368        call, inner_call_of_active_call(server_call))
369    end
370
371    it 'saves no metadata when the server adds no metadata' do
372      call = make_test_call
373      ActiveCall.client_invoke(call)
374      client_call = ActiveCall.new(call, @pass_through,
375                                   @pass_through, deadline)
376      msg = 'message is a string'
377      client_call.remote_send(msg)
378      server_call = expect_server_to_receive(msg)
379      server_call.remote_send('ignore me')
380      expect(client_call.metadata).to be_nil
381      client_call.remote_read
382      expect(client_call.metadata).to eq({})
383      send_and_receive_close_and_status(
384        call, inner_call_of_active_call(server_call))
385    end
386
387    it 'saves metadata add by the server' do
388      call = make_test_call
389      ActiveCall.client_invoke(call)
390      client_call = ActiveCall.new(call, @pass_through,
391                                   @pass_through, deadline)
392      msg = 'message is a string'
393      client_call.remote_send(msg)
394      server_call = expect_server_to_receive(msg, k1: 'v1', k2: 'v2')
395      server_call.remote_send('ignore me')
396      expect(client_call.metadata).to be_nil
397      client_call.remote_read
398      expected = { 'k1' => 'v1', 'k2' => 'v2' }
399      expect(client_call.metadata).to eq(expected)
400      send_and_receive_close_and_status(
401        call, inner_call_of_active_call(server_call))
402    end
403
404    it 'get a status from server when nothing else sent from server' do
405      client_call = make_test_call
406      ActiveCall.client_invoke(client_call)
407
408      recvd_rpc = @server.request_call
409      recvd_call = recvd_rpc.call
410
411      server_call = ActiveCall.new(
412        recvd_call,
413        @pass_through,
414        @pass_through,
415        deadline,
416        started: false)
417
418      server_call.send_status(OK, 'OK')
419
420      # Check that we can receive initial metadata and a status
421      client_call.run_batch(
422        CallOps::RECV_INITIAL_METADATA => nil)
423      batch_result = client_call.run_batch(
424        CallOps::RECV_STATUS_ON_CLIENT => nil)
425
426      expect(batch_result.status.code).to eq(OK)
427    end
428
429    it 'get a nil msg before a status when an OK status is sent' do
430      call = make_test_call
431      ActiveCall.client_invoke(call)
432      client_call = ActiveCall.new(call, @pass_through,
433                                   @pass_through, deadline)
434      msg = 'message is a string'
435      client_call.remote_send(msg)
436      call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
437      server_call = expect_server_to_receive(msg)
438      server_call.remote_send('server_response')
439      server_call.send_status(OK, 'OK')
440      expect(client_call.remote_read).to eq('server_response')
441      res = client_call.remote_read
442      expect(res).to be_nil
443    end
444
445    it 'unmarshals the response using the unmarshal func' do
446      call = make_test_call
447      ActiveCall.client_invoke(call)
448      unmarshal = proc { |x| 'unmarshalled:' + x }
449      client_call = ActiveCall.new(call, @pass_through,
450                                   unmarshal, deadline)
451
452      # confirm the client receives the unmarshalled message
453      msg = 'message is a string'
454      client_call.remote_send(msg)
455      server_call = expect_server_to_receive(msg)
456      server_call.remote_send('server_response')
457      expect(client_call.remote_read).to eq('unmarshalled:server_response')
458      send_and_receive_close_and_status(
459        call, inner_call_of_active_call(server_call))
460    end
461  end
462
463  describe '#each_remote_read' do
464    it 'creates an Enumerator' do
465      call = make_test_call
466      client_call = ActiveCall.new(call, @pass_through,
467                                   @pass_through, deadline)
468      expect(client_call.each_remote_read).to be_a(Enumerator)
469      # finish the call
470      client_call.cancel
471    end
472
473    it 'the returned enumerator can read n responses' do
474      call = make_test_call
475      ActiveCall.client_invoke(call)
476      client_call = ActiveCall.new(call, @pass_through,
477                                   @pass_through, deadline)
478      msg = 'message is a string'
479      reply = 'server_response'
480      client_call.remote_send(msg)
481      server_call = expect_server_to_receive(msg)
482      e = client_call.each_remote_read
483      n = 3  # arbitrary value > 1
484      n.times do
485        server_call.remote_send(reply)
486        expect(e.next).to eq(reply)
487      end
488      send_and_receive_close_and_status(
489        call, inner_call_of_active_call(server_call))
490    end
491
492    it 'the returns an enumerator that stops after an OK Status' do
493      call = make_test_call
494      ActiveCall.client_invoke(call)
495      client_call = ActiveCall.new(call, @pass_through,
496                                   @pass_through, deadline)
497      msg = 'message is a string'
498      reply = 'server_response'
499      client_call.remote_send(msg)
500      call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
501      server_call = expect_server_to_receive(msg)
502      e = client_call.each_remote_read
503      n = 3 # arbitrary value > 1
504      n.times do
505        server_call.remote_send(reply)
506        expect(e.next).to eq(reply)
507      end
508      server_call.send_status(OK, 'OK', true)
509      expect { e.next }.to raise_error(StopIteration)
510    end
511  end
512
513  describe '#closing the call from the client' do
514    it 'finishes ok if the server sends a status response' do
515      call = make_test_call
516      ActiveCall.client_invoke(call)
517      client_call = ActiveCall.new(call, @pass_through,
518                                   @pass_through, deadline)
519      msg = 'message is a string'
520      client_call.remote_send(msg)
521      expect do
522        call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
523      end.to_not raise_error
524      server_call = expect_server_to_receive(msg)
525      server_call.remote_send('server_response')
526      expect(client_call.remote_read).to eq('server_response')
527      server_call.send_status(OK, 'status code is OK')
528      expect { client_call.receive_and_check_status }.to_not raise_error
529    end
530
531    it 'finishes ok if the server sends an early status response' do
532      call = make_test_call
533      ActiveCall.client_invoke(call)
534      client_call = ActiveCall.new(call, @pass_through,
535                                   @pass_through, deadline)
536      msg = 'message is a string'
537      client_call.remote_send(msg)
538      server_call = expect_server_to_receive(msg)
539      server_call.remote_send('server_response')
540      server_call.send_status(OK, 'status code is OK')
541      expect(client_call.remote_read).to eq('server_response')
542      expect do
543        call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
544      end.to_not raise_error
545      expect { client_call.receive_and_check_status }.to_not raise_error
546    end
547
548    it 'finishes ok if SEND_CLOSE and RECV_STATUS has been sent' do
549      call = make_test_call
550      ActiveCall.client_invoke(call)
551      client_call = ActiveCall.new(call, @pass_through,
552                                   @pass_through, deadline)
553      msg = 'message is a string'
554      client_call.remote_send(msg)
555      server_call = expect_server_to_receive(msg)
556      server_call.remote_send('server_response')
557      server_call.send_status(OK, 'status code is OK')
558      expect(client_call.remote_read).to eq('server_response')
559      expect do
560        call.run_batch(
561          CallOps::SEND_CLOSE_FROM_CLIENT => nil,
562          CallOps::RECV_STATUS_ON_CLIENT => nil)
563      end.to_not raise_error
564    end
565  end
566
567  # Test sending of the initial metadata in #run_server_bidi
568  # from the server handler both implicitly and explicitly.
569  describe '#run_server_bidi metadata sending tests', run_server_bidi: true do
570    before(:each) do
571      @requests = ['first message', 'second message']
572      @server_to_client_metadata = { 'test_key' => 'test_val' }
573      @server_status = OK
574
575      @client_call = make_test_call
576      @client_call.run_batch(CallOps::SEND_INITIAL_METADATA => {})
577
578      recvd_rpc = @server.request_call
579      recvd_call = recvd_rpc.call
580      @server_call = ActiveCall.new(
581        recvd_call,
582        @pass_through,
583        @pass_through,
584        deadline,
585        metadata_received: true,
586        started: false,
587        metadata_to_send: @server_to_client_metadata)
588    end
589
590    after(:each) do
591      # Send the requests and send a close so the server can send a status
592      @requests.each do |message|
593        @client_call.run_batch(CallOps::SEND_MESSAGE => message)
594      end
595      @client_call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
596
597      @server_thread.join
598
599      # Expect that initial metadata was sent,
600      # the requests were echoed, and a status was sent
601      batch_result = @client_call.run_batch(
602        CallOps::RECV_INITIAL_METADATA => nil)
603      expect(batch_result.metadata).to eq(@server_to_client_metadata)
604
605      @requests.each do |message|
606        batch_result = @client_call.run_batch(
607          CallOps::RECV_MESSAGE => nil)
608        expect(batch_result.message).to eq(message)
609      end
610
611      batch_result = @client_call.run_batch(
612        CallOps::RECV_STATUS_ON_CLIENT => nil)
613      expect(batch_result.status.code).to eq(@server_status)
614    end
615
616    it 'sends the initial metadata implicitly if not already sent' do
617      # Server handler that doesn't have access to a "call"
618      # It echoes the requests
619      fake_gen_each_reply_with_no_call_param = proc do |msgs|
620        msgs
621      end
622
623      int_ctx = GRPC::InterceptionContext.new
624
625      @server_thread = Thread.new do
626        @server_call.run_server_bidi(
627          fake_gen_each_reply_with_no_call_param, int_ctx)
628        @server_call.send_status(@server_status)
629      end
630    end
631
632    it 'sends the metadata when sent explicitly and not already sent' do
633      # Fake server handler that has access to a "call" object and
634      # uses it to explicitly update and send the initial metadata
635      fake_gen_each_reply_with_call_param = proc do |msgs, call_param|
636        call_param.merge_metadata_to_send(@server_to_client_metadata)
637        call_param.send_initial_metadata
638        msgs
639      end
640      int_ctx = GRPC::InterceptionContext.new
641
642      @server_thread = Thread.new do
643        @server_call.run_server_bidi(
644          fake_gen_each_reply_with_call_param, int_ctx)
645        @server_call.send_status(@server_status)
646      end
647    end
648  end
649
650  def expect_server_to_receive(sent_text, **kw)
651    c = expect_server_to_be_invoked(**kw)
652    expect(c.remote_read).to eq(sent_text)
653    c
654  end
655
656  def expect_server_to_be_invoked(**kw)
657    recvd_rpc =  @server.request_call
658    expect(recvd_rpc).to_not eq nil
659    recvd_call = recvd_rpc.call
660    recvd_call.run_batch(CallOps::SEND_INITIAL_METADATA => kw)
661    ActiveCall.new(recvd_call, @pass_through, @pass_through, deadline,
662                   metadata_received: true, started: true)
663  end
664
665  def make_test_call
666    @ch.create_call(nil, nil, '/method', nil, deadline)
667  end
668
669  def deadline
670    Time.now + 2  # in 2 seconds; arbitrary
671  end
672end
673