1 /* 2 * Copyright 2015 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.benchmarks.netty; 18 19 import java.util.concurrent.CountDownLatch; 20 import java.util.concurrent.TimeUnit; 21 import java.util.concurrent.atomic.AtomicBoolean; 22 import java.util.concurrent.atomic.AtomicLong; 23 import org.openjdk.jmh.annotations.AuxCounters; 24 import org.openjdk.jmh.annotations.Benchmark; 25 import org.openjdk.jmh.annotations.Fork; 26 import org.openjdk.jmh.annotations.Level; 27 import org.openjdk.jmh.annotations.Param; 28 import org.openjdk.jmh.annotations.Scope; 29 import org.openjdk.jmh.annotations.Setup; 30 import org.openjdk.jmh.annotations.State; 31 import org.openjdk.jmh.annotations.TearDown; 32 33 /** 34 * Benchmark intended to test response bandwidth in bytes/sec for streaming calls by permuting 35 * payload size and flow-control windows with number of concurrent calls. Async stubs are used 36 * to avoid context-switching overheads. 37 */ 38 @State(Scope.Benchmark) 39 @Fork(1) 40 public class StreamingResponseBandwidthBenchmark extends AbstractBenchmark { 41 42 @Param({"1", "10"}) 43 public int maxConcurrentStreams = 1; 44 45 @Param({"LARGE", "JUMBO"}) 46 public MessageSize responseSize = MessageSize.JUMBO; 47 48 @Param({"MEDIUM", "LARGE", "JUMBO"}) 49 public FlowWindowSize clientInboundFlowWindow = FlowWindowSize.MEDIUM; 50 51 private static AtomicLong callCounter; 52 private AtomicBoolean completed; 53 private AtomicBoolean record; 54 private CountDownLatch latch; 55 56 /** 57 * Use an AuxCounter so we can measure that calls as they occur without consuming CPU 58 * in the benchmark method. 59 */ 60 @AuxCounters 61 @State(Scope.Thread) 62 public static class AdditionalCounters { 63 64 @Setup(Level.Iteration) clean()65 public void clean() { 66 callCounter.set(0); 67 } 68 megabitsPerSecond()69 public long megabitsPerSecond() { 70 return (callCounter.get() * 8) >> 20; 71 } 72 } 73 74 /** 75 * Setup with direct executors and one channel. 76 */ 77 @Setup(Level.Trial) setup()78 public void setup() throws Exception { 79 super.setup(ExecutorType.DIRECT, 80 ExecutorType.DIRECT, 81 MessageSize.SMALL, 82 responseSize, 83 clientInboundFlowWindow, 84 ChannelType.NIO, 85 maxConcurrentStreams, 86 1); 87 callCounter = new AtomicLong(); 88 completed = new AtomicBoolean(); 89 record = new AtomicBoolean(); 90 latch = startFlowControlledStreamingCalls(maxConcurrentStreams, callCounter, record, completed, 91 responseSize.bytes()); 92 } 93 94 /** 95 * Stop the running calls then stop the server and client channels. 96 */ 97 @Override 98 @TearDown(Level.Trial) teardown()99 public void teardown() throws Exception { 100 completed.set(true); 101 if (!latch.await(5, TimeUnit.SECONDS)) { 102 System.err.println("Failed to shutdown all calls."); 103 } 104 super.teardown(); 105 } 106 107 /** 108 * Measure bandwidth of streamed responses. 109 */ 110 @Benchmark stream(AdditionalCounters counters)111 public void stream(AdditionalCounters counters) throws Exception { 112 record.set(true); 113 // No need to do anything, just sleep here. 114 Thread.sleep(1001); 115 record.set(false); 116 } 117 118 /** 119 * Useful for triggering a subset of the benchmark in a profiler. 120 */ main(String[] argv)121 public static void main(String[] argv) throws Exception { 122 StreamingResponseBandwidthBenchmark bench = new StreamingResponseBandwidthBenchmark(); 123 bench.setup(); 124 Thread.sleep(30000); 125 bench.teardown(); 126 System.exit(0); 127 } 128 } 129