1 /* 2 * Copyright 2015 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.benchmarks.netty; 18 19 import java.util.concurrent.CountDownLatch; 20 import java.util.concurrent.TimeUnit; 21 import java.util.concurrent.atomic.AtomicBoolean; 22 import java.util.concurrent.atomic.AtomicLong; 23 import java.util.logging.Logger; 24 import org.openjdk.jmh.annotations.AuxCounters; 25 import org.openjdk.jmh.annotations.Benchmark; 26 import org.openjdk.jmh.annotations.Fork; 27 import org.openjdk.jmh.annotations.Level; 28 import org.openjdk.jmh.annotations.Param; 29 import org.openjdk.jmh.annotations.Scope; 30 import org.openjdk.jmh.annotations.Setup; 31 import org.openjdk.jmh.annotations.State; 32 import org.openjdk.jmh.annotations.TearDown; 33 34 /** 35 * Benchmark measuring messages per second using a set of permanently open duplex streams which 36 * ping-pong messages. 37 */ 38 @State(Scope.Benchmark) 39 @Fork(1) 40 public class StreamingPingPongsPerSecondBenchmark extends AbstractBenchmark { 41 private static final Logger logger = 42 Logger.getLogger(StreamingPingPongsPerSecondBenchmark.class.getName()); 43 44 @Param({"1", "2", "4", "8"}) 45 public int channelCount = 1; 46 47 @Param({"1", "10", "100", "1000"}) 48 public int maxConcurrentStreams = 1; 49 50 private static AtomicLong callCounter; 51 private AtomicBoolean completed; 52 private AtomicBoolean record; 53 private CountDownLatch latch; 54 55 /** 56 * Use an AuxCounter so we can measure that calls as they occur without consuming CPU 57 * in the benchmark method. 58 */ 59 @AuxCounters 60 @State(Scope.Thread) 61 public static class AdditionalCounters { 62 63 @Setup(Level.Iteration) clean()64 public void clean() { 65 callCounter.set(0); 66 } 67 pingPongsPerSecond()68 public long pingPongsPerSecond() { 69 return callCounter.get(); 70 } 71 } 72 73 /** 74 * Setup with direct executors, small payloads and the default flow-control window. 75 */ 76 @Setup(Level.Trial) setup()77 public void setup() throws Exception { 78 super.setup(ExecutorType.DIRECT, 79 ExecutorType.DIRECT, 80 MessageSize.SMALL, 81 MessageSize.SMALL, 82 FlowWindowSize.MEDIUM, 83 ChannelType.NIO, 84 maxConcurrentStreams, 85 channelCount); 86 callCounter = new AtomicLong(); 87 completed = new AtomicBoolean(); 88 record = new AtomicBoolean(); 89 latch = startStreamingCalls(maxConcurrentStreams, callCounter, record, completed, 1); 90 } 91 92 /** 93 * Stop the running calls then stop the server and client channels. 94 */ 95 @Override 96 @TearDown(Level.Trial) teardown()97 public void teardown() throws Exception { 98 completed.set(true); 99 if (!latch.await(5, TimeUnit.SECONDS)) { 100 logger.warning("Failed to shutdown all calls."); 101 } 102 super.teardown(); 103 } 104 105 /** 106 * Measure throughput of unary calls. The calls are already running, we just observe a counter 107 * of received responses. 108 */ 109 @Benchmark pingPong(AdditionalCounters counters)110 public void pingPong(AdditionalCounters counters) throws Exception { 111 record.set(true); 112 // No need to do anything, just sleep here. 113 Thread.sleep(1001); 114 record.set(false); 115 } 116 117 /** 118 * Useful for triggering a subset of the benchmark in a profiler. 119 */ main(String[] argv)120 public static void main(String[] argv) throws Exception { 121 StreamingPingPongsPerSecondBenchmark bench = new StreamingPingPongsPerSecondBenchmark(); 122 bench.setup(); 123 Thread.sleep(30000); 124 bench.teardown(); 125 System.exit(0); 126 } 127 } 128