1 /*
2  * Copyright 2015 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.benchmarks.netty;
18 
19 import java.util.concurrent.CountDownLatch;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.atomic.AtomicBoolean;
22 import java.util.concurrent.atomic.AtomicLong;
23 import org.openjdk.jmh.annotations.AuxCounters;
24 import org.openjdk.jmh.annotations.Benchmark;
25 import org.openjdk.jmh.annotations.Fork;
26 import org.openjdk.jmh.annotations.Level;
27 import org.openjdk.jmh.annotations.Param;
28 import org.openjdk.jmh.annotations.Scope;
29 import org.openjdk.jmh.annotations.Setup;
30 import org.openjdk.jmh.annotations.State;
31 import org.openjdk.jmh.annotations.TearDown;
32 
33 /**
34  * Benchmark intended to test response bandwidth in bytes/sec for streaming calls by permuting
35  * payload size and flow-control windows with number of concurrent calls. Async stubs are used
36  * to avoid context-switching overheads.
37  */
38 @State(Scope.Benchmark)
39 @Fork(1)
40 public class StreamingResponseBandwidthBenchmark extends AbstractBenchmark {
41 
42   @Param({"1", "10"})
43   public int maxConcurrentStreams = 1;
44 
45   @Param({"LARGE", "JUMBO"})
46   public MessageSize responseSize = MessageSize.JUMBO;
47 
48   @Param({"MEDIUM", "LARGE", "JUMBO"})
49   public FlowWindowSize clientInboundFlowWindow = FlowWindowSize.MEDIUM;
50 
51   private static AtomicLong callCounter;
52   private AtomicBoolean completed;
53   private AtomicBoolean record;
54   private CountDownLatch latch;
55 
56   /**
57    * Use an AuxCounter so we can measure that calls as they occur without consuming CPU
58    * in the benchmark method.
59    */
60   @AuxCounters
61   @State(Scope.Thread)
62   public static class AdditionalCounters {
63 
64     @Setup(Level.Iteration)
clean()65     public void clean() {
66       callCounter.set(0);
67     }
68 
megabitsPerSecond()69     public long megabitsPerSecond() {
70       return (callCounter.get() * 8) >> 20;
71     }
72   }
73 
74   /**
75    * Setup with direct executors and one channel.
76    */
77   @Setup(Level.Trial)
setup()78   public void setup() throws Exception {
79     super.setup(ExecutorType.DIRECT,
80         ExecutorType.DIRECT,
81         MessageSize.SMALL,
82         responseSize,
83         clientInboundFlowWindow,
84         ChannelType.NIO,
85         maxConcurrentStreams,
86         1);
87     callCounter = new AtomicLong();
88     completed = new AtomicBoolean();
89     record = new AtomicBoolean();
90     latch = startFlowControlledStreamingCalls(maxConcurrentStreams, callCounter, record, completed,
91         responseSize.bytes());
92   }
93 
94   /**
95    * Stop the running calls then stop the server and client channels.
96    */
97   @Override
98   @TearDown(Level.Trial)
teardown()99   public void teardown() throws Exception {
100     completed.set(true);
101     if (!latch.await(5, TimeUnit.SECONDS)) {
102       System.err.println("Failed to shutdown all calls.");
103     }
104     super.teardown();
105   }
106 
107   /**
108    * Measure bandwidth of streamed responses.
109    */
110   @Benchmark
stream(AdditionalCounters counters)111   public void stream(AdditionalCounters counters) throws Exception {
112     record.set(true);
113     // No need to do anything, just sleep here.
114     Thread.sleep(1001);
115     record.set(false);
116   }
117 
118   /**
119    * Useful for triggering a subset of the benchmark in a profiler.
120    */
main(String[] argv)121   public static void main(String[] argv) throws Exception {
122     StreamingResponseBandwidthBenchmark bench = new StreamingResponseBandwidthBenchmark();
123     bench.setup();
124     Thread.sleep(30000);
125     bench.teardown();
126     System.exit(0);
127   }
128 }
129