1 /* 2 * Copyright 2016 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.benchmarks.driver; 18 19 import static java.util.concurrent.ForkJoinPool.defaultForkJoinWorkerThreadFactory; 20 21 import com.google.common.util.concurrent.UncaughtExceptionHandlers; 22 import com.sun.management.OperatingSystemMXBean; 23 import io.grpc.Metadata; 24 import io.grpc.MethodDescriptor; 25 import io.grpc.MethodDescriptor.Marshaller; 26 import io.grpc.Server; 27 import io.grpc.ServerBuilder; 28 import io.grpc.ServerCall; 29 import io.grpc.ServerCallHandler; 30 import io.grpc.ServerServiceDefinition; 31 import io.grpc.ServiceDescriptor; 32 import io.grpc.Status; 33 import io.grpc.benchmarks.ByteBufOutputMarshaller; 34 import io.grpc.benchmarks.Utils; 35 import io.grpc.benchmarks.proto.BenchmarkServiceGrpc; 36 import io.grpc.benchmarks.proto.Control; 37 import io.grpc.benchmarks.proto.Stats; 38 import io.grpc.benchmarks.qps.AsyncServer; 39 import io.grpc.internal.testing.TestUtils; 40 import io.netty.buffer.ByteBuf; 41 import io.netty.buffer.PooledByteBufAllocator; 42 import java.io.File; 43 import java.lang.management.ManagementFactory; 44 import java.util.List; 45 import java.util.concurrent.ExecutorService; 46 import java.util.concurrent.ForkJoinPool; 47 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; 48 import java.util.concurrent.ForkJoinWorkerThread; 49 import java.util.concurrent.atomic.AtomicInteger; 50 import java.util.logging.Level; 51 import java.util.logging.Logger; 52 53 /** 54 * Implements the server-side contract for the load testing scenarios. 55 */ 56 final class LoadServer { 57 58 private static final Marshaller<ByteBuf> marshaller = new ByteBufOutputMarshaller(); 59 /** 60 * Generic version of the unary method call. 61 */ 62 static final MethodDescriptor<ByteBuf, ByteBuf> GENERIC_UNARY_METHOD = 63 BenchmarkServiceGrpc.getUnaryCallMethod().toBuilder(marshaller, marshaller) 64 .build(); 65 66 /** 67 * Generic version of the streaming ping-pong method call. 68 */ 69 static final MethodDescriptor<ByteBuf, ByteBuf> GENERIC_STREAMING_PING_PONG_METHOD = 70 BenchmarkServiceGrpc.getStreamingCallMethod().toBuilder(marshaller, marshaller) 71 .build(); 72 73 private static final Logger log = Logger.getLogger(LoadServer.class.getName()); 74 75 private final Server server; 76 private final AsyncServer.BenchmarkServiceImpl benchmarkService; 77 private final OperatingSystemMXBean osBean; 78 private final int port; 79 private ByteBuf genericResponse; 80 private long lastStatTime; 81 private long lastMarkCpuTime; 82 LoadServer(Control.ServerConfig config)83 LoadServer(Control.ServerConfig config) throws Exception { 84 log.log(Level.INFO, "Server Config \n" + config.toString()); 85 port = config.getPort() == 0 ? Utils.pickUnusedPort() : config.getPort(); 86 ServerBuilder<?> serverBuilder = ServerBuilder.forPort(port); 87 int asyncThreads = config.getAsyncServerThreads() == 0 88 ? Runtime.getRuntime().availableProcessors() 89 : config.getAsyncServerThreads(); 90 // The concepts of sync & async server are quite different in the C impl and the names 91 // chosen for the enum are based on that implementation. We use 'sync' to mean 92 // the direct executor case in Java even though the service implementations are always 93 // fully async. 94 switch (config.getServerType()) { 95 case ASYNC_SERVER: { 96 serverBuilder.executor(getExecutor(asyncThreads)); 97 break; 98 } 99 case SYNC_SERVER: { 100 serverBuilder.directExecutor(); 101 break; 102 } 103 case ASYNC_GENERIC_SERVER: { 104 serverBuilder.executor(getExecutor(asyncThreads)); 105 // Create buffers for the generic service 106 PooledByteBufAllocator alloc = PooledByteBufAllocator.DEFAULT; 107 genericResponse = alloc.buffer(config.getPayloadConfig().getBytebufParams().getRespSize()); 108 if (genericResponse.capacity() > 0) { 109 genericResponse.writerIndex(genericResponse.capacity() - 1); 110 } 111 break; 112 } 113 default: { 114 throw new IllegalArgumentException(); 115 } 116 } 117 if (config.hasSecurityParams()) { 118 File cert = TestUtils.loadCert("server1.pem"); 119 File key = TestUtils.loadCert("server1.key"); 120 serverBuilder.useTransportSecurity(cert, key); 121 } 122 benchmarkService = new AsyncServer.BenchmarkServiceImpl(); 123 if (config.getServerType() == Control.ServerType.ASYNC_GENERIC_SERVER) { 124 serverBuilder.addService( 125 ServerServiceDefinition 126 .builder(new ServiceDescriptor(BenchmarkServiceGrpc.SERVICE_NAME, 127 GENERIC_STREAMING_PING_PONG_METHOD)) 128 .addMethod(GENERIC_STREAMING_PING_PONG_METHOD, new GenericServiceCallHandler()) 129 .build()); 130 } else { 131 serverBuilder.addService(benchmarkService); 132 } 133 server = serverBuilder.build(); 134 135 List<OperatingSystemMXBean> beans = 136 ManagementFactory.getPlatformMXBeans(OperatingSystemMXBean.class); 137 if (!beans.isEmpty()) { 138 osBean = beans.get(0); 139 } else { 140 osBean = null; 141 } 142 } 143 getExecutor(int asyncThreads)144 ExecutorService getExecutor(int asyncThreads) { 145 // TODO(carl-mastrangelo): This should not be necessary. I don't know where this should be 146 // put. Move it somewhere else, or remove it if no longer necessary. 147 // See: https://github.com/grpc/grpc-java/issues/2119 148 return new ForkJoinPool(asyncThreads, 149 new ForkJoinWorkerThreadFactory() { 150 final AtomicInteger num = new AtomicInteger(); 151 @Override 152 public ForkJoinWorkerThread newThread(ForkJoinPool pool) { 153 ForkJoinWorkerThread thread = defaultForkJoinWorkerThreadFactory.newThread(pool); 154 thread.setDaemon(true); 155 thread.setName("server-worker-" + "-" + num.getAndIncrement()); 156 return thread; 157 } 158 }, UncaughtExceptionHandlers.systemExit(), true /* async */); 159 } 160 getPort()161 int getPort() { 162 return port; 163 } 164 getCores()165 int getCores() { 166 return Runtime.getRuntime().availableProcessors(); 167 } 168 start()169 void start() throws Exception { 170 server.start(); 171 lastStatTime = System.nanoTime(); 172 if (osBean != null) { 173 lastMarkCpuTime = osBean.getProcessCpuTime(); 174 } 175 } 176 getStats()177 Stats.ServerStats getStats() { 178 Stats.ServerStats.Builder builder = Stats.ServerStats.newBuilder(); 179 long now = System.nanoTime(); 180 double elapsed = ((double) now - lastStatTime) / 1000000000.0; 181 lastStatTime = now; 182 builder.setTimeElapsed(elapsed); 183 if (osBean != null) { 184 // Report all the CPU time as user-time (which is intentionally incorrect) 185 long nowCpu = osBean.getProcessCpuTime(); 186 builder.setTimeUser(((double) nowCpu - lastMarkCpuTime) / 1000000000.0); 187 lastMarkCpuTime = nowCpu; 188 } 189 return builder.build(); 190 } 191 shutdownNow()192 void shutdownNow() { 193 benchmarkService.shutdown(); 194 server.shutdownNow(); 195 } 196 197 private class GenericServiceCallHandler implements ServerCallHandler<ByteBuf, ByteBuf> { 198 199 @Override startCall( final ServerCall<ByteBuf, ByteBuf> call, Metadata headers)200 public ServerCall.Listener<ByteBuf> startCall( 201 final ServerCall<ByteBuf, ByteBuf> call, Metadata headers) { 202 call.sendHeaders(new Metadata()); 203 call.request(1); 204 return new ServerCall.Listener<ByteBuf>() { 205 @Override 206 public void onMessage(ByteBuf message) { 207 // no-op 208 message.release(); 209 call.request(1); 210 call.sendMessage(genericResponse.slice()); 211 } 212 213 @Override 214 public void onHalfClose() { 215 call.close(Status.OK, new Metadata()); 216 } 217 218 @Override 219 public void onCancel() { 220 } 221 222 @Override 223 public void onComplete() { 224 } 225 }; 226 } 227 } 228 } 229