1 /*
2  * Copyright 2015 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.benchmarks.netty;
18 
19 import java.util.concurrent.CountDownLatch;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.atomic.AtomicBoolean;
22 import java.util.concurrent.atomic.AtomicLong;
23 import java.util.logging.Logger;
24 import org.openjdk.jmh.annotations.AuxCounters;
25 import org.openjdk.jmh.annotations.Benchmark;
26 import org.openjdk.jmh.annotations.Fork;
27 import org.openjdk.jmh.annotations.Level;
28 import org.openjdk.jmh.annotations.Param;
29 import org.openjdk.jmh.annotations.Scope;
30 import org.openjdk.jmh.annotations.Setup;
31 import org.openjdk.jmh.annotations.State;
32 import org.openjdk.jmh.annotations.TearDown;
33 
34 /**
35  * Benchmark measuring messages per second using a set of permanently open duplex streams which
36  * ping-pong messages.
37  */
38 @State(Scope.Benchmark)
39 @Fork(1)
40 public class StreamingPingPongsPerSecondBenchmark extends AbstractBenchmark {
41   private static final Logger logger =
42       Logger.getLogger(StreamingPingPongsPerSecondBenchmark.class.getName());
43 
44   @Param({"1", "2", "4", "8"})
45   public int channelCount = 1;
46 
47   @Param({"1", "10", "100", "1000"})
48   public int maxConcurrentStreams = 1;
49 
50   private static AtomicLong callCounter;
51   private AtomicBoolean completed;
52   private AtomicBoolean record;
53   private CountDownLatch latch;
54 
55   /**
56    * Use an AuxCounter so we can measure that calls as they occur without consuming CPU
57    * in the benchmark method.
58    */
59   @AuxCounters
60   @State(Scope.Thread)
61   public static class AdditionalCounters {
62 
63     @Setup(Level.Iteration)
clean()64     public void clean() {
65       callCounter.set(0);
66     }
67 
pingPongsPerSecond()68     public long pingPongsPerSecond() {
69       return callCounter.get();
70     }
71   }
72 
73   /**
74    * Setup with direct executors, small payloads and the default flow-control window.
75    */
76   @Setup(Level.Trial)
setup()77   public void setup() throws Exception {
78     super.setup(ExecutorType.DIRECT,
79         ExecutorType.DIRECT,
80         MessageSize.SMALL,
81         MessageSize.SMALL,
82         FlowWindowSize.MEDIUM,
83         ChannelType.NIO,
84         maxConcurrentStreams,
85         channelCount);
86     callCounter = new AtomicLong();
87     completed = new AtomicBoolean();
88     record = new AtomicBoolean();
89     latch = startStreamingCalls(maxConcurrentStreams, callCounter, record, completed, 1);
90   }
91 
92   /**
93    * Stop the running calls then stop the server and client channels.
94    */
95   @Override
96   @TearDown(Level.Trial)
teardown()97   public void teardown() throws Exception {
98     completed.set(true);
99     if (!latch.await(5, TimeUnit.SECONDS)) {
100       logger.warning("Failed to shutdown all calls.");
101     }
102     super.teardown();
103   }
104 
105   /**
106    * Measure throughput of unary calls. The calls are already running, we just observe a counter
107    * of received responses.
108    */
109   @Benchmark
pingPong(AdditionalCounters counters)110   public void pingPong(AdditionalCounters counters) throws Exception {
111     record.set(true);
112     // No need to do anything, just sleep here.
113     Thread.sleep(1001);
114     record.set(false);
115   }
116 
117   /**
118    * Useful for triggering a subset of the benchmark in a profiler.
119    */
main(String[] argv)120   public static void main(String[] argv) throws Exception {
121     StreamingPingPongsPerSecondBenchmark bench = new StreamingPingPongsPerSecondBenchmark();
122     bench.setup();
123     Thread.sleep(30000);
124     bench.teardown();
125     System.exit(0);
126   }
127 }
128