1#!/usr/bin/env ruby
2
3# Copyright 2015 gRPC authors.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17# pubsub_demo demos accesses the Google PubSub API via its gRPC interface
18#
19# $ GOOGLE_APPLICATION_CREDENTIALS=<path_to_service_account_key_file> \
20#   path/to/pubsub_demo.rb \
21#   [--action=<chosen_demo_action> ]
22#
23# There are options related to the chosen action, see #parse_args below.
24# - the possible actions are given by the method names of NamedAction class
25# - the default action is list_some_topics
26
27this_dir = File.expand_path(File.dirname(__FILE__))
28lib_dir = File.join(File.dirname(File.dirname(this_dir)), 'lib')
29$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
30$LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir)
31
32require 'optparse'
33
34require 'grpc'
35require 'googleauth'
36require 'google/protobuf'
37
38require 'google/protobuf/empty'
39require 'tech/pubsub/proto/pubsub'
40require 'tech/pubsub/proto/pubsub_services'
41
42# creates a SSL Credentials from the production certificates.
43def ssl_creds
44  GRPC::Core::ChannelCredentials.new()
45end
46
47# Builds the metadata authentication update proc.
48def auth_proc(opts)
49  auth_creds = Google::Auth.get_application_default
50  return auth_creds.updater_proc
51end
52
53# Creates a stub for accessing the publisher service.
54def publisher_stub(opts)
55  address = "#{opts.host}:#{opts.port}"
56  stub_clz = Tech::Pubsub::PublisherService::Stub # shorter
57  GRPC.logger.info("... access PublisherService at #{address}")
58  call_creds = GRPC::Core::CallCredentials.new(auth_proc(opts))
59  combined_creds = ssl_creds.compose(call_creds)
60  stub_clz.new(address, creds: combined_creds,
61               GRPC::Core::Channel::SSL_TARGET => opts.host)
62end
63
64# Creates a stub for accessing the subscriber service.
65def subscriber_stub(opts)
66  address = "#{opts.host}:#{opts.port}"
67  stub_clz = Tech::Pubsub::SubscriberService::Stub # shorter
68  GRPC.logger.info("... access SubscriberService at #{address}")
69  call_creds = GRPC::Core::CallCredentials.new(auth_proc(opts))
70  combined_creds = ssl_creds.compose(call_creds)
71  stub_clz.new(address, creds: combined_creds,
72               GRPC::Core::Channel::SSL_TARGET => opts.host)
73end
74
75# defines methods corresponding to each interop test case.
76class NamedActions
77  include Tech::Pubsub
78
79  # Initializes NamedActions
80  #
81  # @param pub [Stub] a stub for accessing the publisher service
82  # @param sub [Stub] a stub for accessing the publisher service
83  # @param args [Args] provides access to the command line
84  def initialize(pub, sub, args)
85    @pub = pub
86    @sub = sub
87    @args = args
88  end
89
90  # Removes the test topic if it exists
91  def remove_topic
92    name = test_topic_name
93    p "... removing Topic #{name}"
94    @pub.delete_topic(DeleteTopicRequest.new(topic: name))
95    p "removed Topic: #{name} OK"
96  rescue GRPC::BadStatus => e
97    p "Could not delete a topics: rpc failed with '#{e}'"
98  end
99
100  # Creates a test topic
101  def create_topic
102    name = test_topic_name
103    p "... creating Topic #{name}"
104    resp = @pub.create_topic(Topic.new(name: name))
105    p "created Topic: #{resp.name} OK"
106  rescue GRPC::BadStatus => e
107    p "Could not create a topics: rpc failed with '#{e}'"
108  end
109
110  # Lists topics in the project
111  def list_some_topics
112    p 'Listing topics'
113    p '-------------_'
114    list_project_topics.topic.each { |t| p t.name }
115  rescue GRPC::BadStatus => e
116    p "Could not list topics: rpc failed with '#{e}'"
117  end
118
119  # Checks if a topics exists in a project
120  def check_exists
121    name = test_topic_name
122    p "... checking for topic #{name}"
123    exists = topic_exists?(name)
124    p "#{name} is a topic" if exists
125    p "#{name} is not a topic" unless exists
126  rescue GRPC::BadStatus => e
127    p "Could not check for a topics: rpc failed with '#{e}'"
128  end
129
130  # Publishes some messages
131  def random_pub_sub
132    topic_name, sub_name = test_topic_name, test_sub_name
133    create_topic_if_needed(topic_name)
134    @sub.create_subscription(Subscription.new(name: sub_name,
135                                              topic: topic_name))
136    msg_count = rand(10..30)
137    msg_count.times do |x|
138      msg = PubsubMessage.new(data: "message #{x}")
139      @pub.publish(PublishRequest.new(topic: topic_name, message: msg))
140    end
141    p "Sent #{msg_count} messages to #{topic_name}, checking for them now."
142    batch = @sub.pull_batch(PullBatchRequest.new(subscription: sub_name,
143                                                 max_events: msg_count))
144    ack_ids = batch.pull_responses.map { |x| x.ack_id }
145    p "Got #{ack_ids.size} messages; acknowledging them.."
146    @sub.acknowledge(AcknowledgeRequest.new(subscription: sub_name,
147                                            ack_id: ack_ids))
148    p "Test messages were acknowledged OK, deleting the subscription"
149    del_req = DeleteSubscriptionRequest.new(subscription: sub_name)
150    @sub.delete_subscription(del_req)
151  rescue GRPC::BadStatus => e
152    p "Could not do random pub sub: rpc failed with '#{e}'"
153  end
154
155  private
156
157  # test_topic_name is the topic name to use in this test.
158  def test_topic_name
159    unless @args.topic_name.nil?
160      return "/topics/#{@args.project_id}/#{@args.topic_name}"
161    end
162    now_text = Time.now.utc.strftime('%Y%m%d%H%M%S%L')
163    "/topics/#{@args.project_id}/#{ENV['USER']}-#{now_text}"
164  end
165
166  # test_sub_name is the subscription name to use in this test.
167  def test_sub_name
168    unless @args.sub_name.nil?
169      return "/subscriptions/#{@args.project_id}/#{@args.sub_name}"
170    end
171    now_text = Time.now.utc.strftime('%Y%m%d%H%M%S%L')
172    "/subscriptions/#{@args.project_id}/#{ENV['USER']}-#{now_text}"
173  end
174
175  # determines if the topic name exists
176  def topic_exists?(name)
177    topics = list_project_topics.topic.map { |t| t.name }
178    topics.include?(name)
179  end
180
181  def create_topic_if_needed(name)
182    return if topic_exists?(name)
183    @pub.create_topic(Topic.new(name: name))
184  end
185
186  def list_project_topics
187    q = "cloud.googleapis.com/project in (/projects/#{@args.project_id})"
188    @pub.list_topics(ListTopicsRequest.new(query: q))
189  end
190end
191
192# Args is used to hold the command line info.
193Args = Struct.new(:host, :port, :action, :project_id, :topic_name,
194                  :sub_name)
195
196# validates the command line options, returning them as an Arg.
197def parse_args
198  args = Args.new('pubsub-staging.googleapis.com',
199                   443, 'list_some_topics', 'stoked-keyword-656')
200  OptionParser.new do |opts|
201    opts.on('--server_host SERVER_HOST', 'server hostname') do |v|
202      args.host = v
203    end
204    opts.on('--server_port SERVER_PORT', 'server port') do |v|
205      args.port = v
206    end
207
208    # instance_methods(false) gives only the methods defined in that class.
209    scenes = NamedActions.instance_methods(false).map { |t| t.to_s }
210    scene_list = scenes.join(',')
211    opts.on("--action CODE", scenes, {}, 'pick a demo action',
212            "  (#{scene_list})") do |v|
213      args.action = v
214    end
215
216    # Set the remaining values.
217    %w(project_id topic_name sub_name).each do |o|
218      opts.on("--#{o} VALUE", "#{o}") do |v|
219        args[o] = v
220      end
221    end
222  end.parse!
223  _check_args(args)
224end
225
226def _check_args(args)
227  %w(host port action).each do |a|
228    if args[a].nil?
229      raise OptionParser::MissingArgument.new("please specify --#{a}")
230    end
231  end
232  args
233end
234
235def main
236  args = parse_args
237  pub, sub = publisher_stub(args), subscriber_stub(args)
238  NamedActions.new(pub, sub, args).method(args.action).call
239end
240
241main
242