1#!/usr/bin/env ruby
2
3# Copyright 2015 gRPC authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17# interop_server is a Testing app that runs a gRPC interop testing server.
18#
19# It helps validate interoperation b/w gRPC in different environments
20#
21# Helps validate interoperation b/w different gRPC implementations.
22#
23# Usage: $ path/to/interop_server.rb --port
24
25this_dir = File.expand_path(File.dirname(__FILE__))
26lib_dir = File.join(File.dirname(File.dirname(this_dir)), 'lib')
27pb_dir = File.dirname(this_dir)
28$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
29$LOAD_PATH.unshift(pb_dir) unless $LOAD_PATH.include?(pb_dir)
30$LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir)
31
32require 'forwardable'
33require 'logger'
34require 'optparse'
35
36require 'grpc'
37
38require_relative '../src/proto/grpc/testing/empty_pb'
39require_relative '../src/proto/grpc/testing/messages_pb'
40require_relative '../src/proto/grpc/testing/test_services_pb'
41
42# DebugIsTruncated extends the default Logger to truncate debug messages
43class DebugIsTruncated < Logger
44  def debug(s)
45    super(truncate(s, 1024))
46  end
47
48  # Truncates a given +text+ after a given <tt>length</tt> if +text+ is longer than <tt>length</tt>:
49  #
50  #   'Once upon a time in a world far far away'.truncate(27)
51  #   # => "Once upon a time in a wo..."
52  #
53  # Pass a string or regexp <tt>:separator</tt> to truncate +text+ at a natural break:
54  #
55  #   'Once upon a time in a world far far away'.truncate(27, separator: ' ')
56  #   # => "Once upon a time in a..."
57  #
58  #   'Once upon a time in a world far far away'.truncate(27, separator: /\s/)
59  #   # => "Once upon a time in a..."
60  #
61  # The last characters will be replaced with the <tt>:omission</tt> string (defaults to "...")
62  # for a total length not exceeding <tt>length</tt>:
63  #
64  #   'And they found that many people were sleeping better.'.truncate(25, omission: '... (continued)')
65  #   # => "And they f... (continued)"
66  def truncate(s, truncate_at, options = {})
67    return s unless s.length > truncate_at
68    omission = options[:omission] || '...'
69    with_extra_room = truncate_at - omission.length
70    stop = \
71      if options[:separator]
72        rindex(options[:separator], with_extra_room) || with_extra_room
73      else
74        with_extra_room
75      end
76    "#{s[0, stop]}#{omission}"
77  end
78end
79
80# RubyLogger defines a logger for gRPC based on the standard ruby logger.
81module RubyLogger
82  def logger
83    LOGGER
84  end
85
86  LOGGER = DebugIsTruncated.new(STDOUT)
87  LOGGER.level = Logger::WARN
88end
89
90# GRPC is the general RPC module
91module GRPC
92  # Inject the noop #logger if no module-level logger method has been injected.
93  extend RubyLogger
94end
95
96# loads the certificates by the test server.
97def load_test_certs
98  this_dir = File.expand_path(File.dirname(__FILE__))
99  data_dir = File.join(File.dirname(File.dirname(this_dir)), 'spec/testdata')
100  files = ['ca.pem', 'server1.key', 'server1.pem']
101  files.map { |f| File.open(File.join(data_dir, f)).read }
102end
103
104# creates a ServerCredentials from the test certificates.
105def test_server_creds
106  certs = load_test_certs
107  GRPC::Core::ServerCredentials.new(
108      nil, [{private_key: certs[1], cert_chain: certs[2]}], false)
109end
110
111# produces a string of null chars (\0) of length l.
112def nulls(l)
113  fail 'requires #{l} to be +ve' if l < 0
114  [].pack('x' * l).force_encoding('ascii-8bit')
115end
116
117def maybe_echo_metadata(_call)
118
119  # these are consistent for all interop tests
120  initial_metadata_key = "x-grpc-test-echo-initial"
121  trailing_metadata_key = "x-grpc-test-echo-trailing-bin"
122
123  if _call.metadata.has_key?(initial_metadata_key)
124    _call.metadata_to_send[initial_metadata_key] = _call.metadata[initial_metadata_key]
125  end
126  if _call.metadata.has_key?(trailing_metadata_key)
127    _call.output_metadata[trailing_metadata_key] = _call.metadata[trailing_metadata_key]
128  end
129end
130
131def maybe_echo_status_and_message(req)
132  unless req.response_status.nil?
133    fail GRPC::BadStatus.new_status_exception(
134        req.response_status.code, req.response_status.message)
135  end
136end
137
138# A FullDuplexEnumerator passes requests to a block and yields generated responses
139class FullDuplexEnumerator
140  include Grpc::Testing
141  include Grpc::Testing::PayloadType
142
143  def initialize(requests)
144    @requests = requests
145  end
146  def each_item
147    return enum_for(:each_item) unless block_given?
148    GRPC.logger.info('interop-server: started receiving')
149    begin
150      cls = StreamingOutputCallResponse
151      @requests.each do |req|
152        maybe_echo_status_and_message(req)
153        req.response_parameters.each do |params|
154          resp_size = params.size
155          GRPC.logger.info("read a req, response size is #{resp_size}")
156          yield cls.new(payload: Payload.new(type: req.response_type,
157                                              body: nulls(resp_size)))
158        end
159      end
160      GRPC.logger.info('interop-server: finished receiving')
161    rescue StandardError => e
162      GRPC.logger.info('interop-server: failed')
163      GRPC.logger.warn(e)
164      fail e
165    end
166  end
167end
168
169# A runnable implementation of the schema-specified testing service, with each
170# service method implemented as required by the interop testing spec.
171class TestTarget < Grpc::Testing::TestService::Service
172  include Grpc::Testing
173  include Grpc::Testing::PayloadType
174
175  def empty_call(_empty, _call)
176    Empty.new
177  end
178
179  def unary_call(simple_req, _call)
180    maybe_echo_metadata(_call)
181    maybe_echo_status_and_message(simple_req)
182    req_size = simple_req.response_size
183    SimpleResponse.new(payload: Payload.new(type: :COMPRESSABLE,
184                                            body: nulls(req_size)))
185  end
186
187  def streaming_input_call(call)
188    sizes = call.each_remote_read.map { |x| x.payload.body.length }
189    sum = sizes.inject(0) { |s, x| s + x }
190    StreamingInputCallResponse.new(aggregated_payload_size: sum)
191  end
192
193  def streaming_output_call(req, _call)
194    cls = StreamingOutputCallResponse
195    req.response_parameters.map do |p|
196      cls.new(payload: Payload.new(type: req.response_type,
197                                   body: nulls(p.size)))
198    end
199  end
200
201  def full_duplex_call(reqs, _call)
202    maybe_echo_metadata(_call)
203    # reqs is a lazy Enumerator of the requests sent by the client.
204    FullDuplexEnumerator.new(reqs).each_item
205  end
206
207  def half_duplex_call(reqs)
208    # TODO: update with unique behaviour of the half_duplex_call if that's
209    # ever required by any of the tests.
210    full_duplex_call(reqs)
211  end
212end
213
214# validates the command line options, returning them as a Hash.
215def parse_options
216  options = {
217    'port' => nil,
218    'secure' => false
219  }
220  OptionParser.new do |opts|
221    opts.banner = 'Usage: --port port'
222    opts.on('--port PORT', 'server port') do |v|
223      options['port'] = v
224    end
225    opts.on('--use_tls USE_TLS', ['false', 'true'],
226            'require a secure connection?') do |v|
227      options['secure'] = v == 'true'
228    end
229  end.parse!
230
231  if options['port'].nil?
232    fail(OptionParser::MissingArgument, 'please specify --port')
233  end
234  options
235end
236
237def main
238  opts = parse_options
239  host = "0.0.0.0:#{opts['port']}"
240  s = GRPC::RpcServer.new
241  if opts['secure']
242    s.add_http2_port(host, test_server_creds)
243    GRPC.logger.info("... running securely on #{host}")
244  else
245    s.add_http2_port(host, :this_port_is_insecure)
246    GRPC.logger.info("... running insecurely on #{host}")
247  end
248  s.handle(TestTarget)
249  s.run_till_terminated
250end
251
252main
253