• Home
  • History
  • Annotate
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.benchmarks.driver;
18 
19 import static java.util.concurrent.ForkJoinPool.defaultForkJoinWorkerThreadFactory;
20 
21 import com.google.common.util.concurrent.UncaughtExceptionHandlers;
22 import com.sun.management.OperatingSystemMXBean;
23 import io.grpc.Metadata;
24 import io.grpc.MethodDescriptor;
25 import io.grpc.MethodDescriptor.Marshaller;
26 import io.grpc.Server;
27 import io.grpc.ServerBuilder;
28 import io.grpc.ServerCall;
29 import io.grpc.ServerCallHandler;
30 import io.grpc.ServerServiceDefinition;
31 import io.grpc.ServiceDescriptor;
32 import io.grpc.Status;
33 import io.grpc.benchmarks.ByteBufOutputMarshaller;
34 import io.grpc.benchmarks.Utils;
35 import io.grpc.benchmarks.proto.BenchmarkServiceGrpc;
36 import io.grpc.benchmarks.proto.Control;
37 import io.grpc.benchmarks.proto.Stats;
38 import io.grpc.benchmarks.qps.AsyncServer;
39 import io.grpc.internal.testing.TestUtils;
40 import io.netty.buffer.ByteBuf;
41 import io.netty.buffer.PooledByteBufAllocator;
42 import java.io.File;
43 import java.lang.management.ManagementFactory;
44 import java.util.List;
45 import java.util.concurrent.ExecutorService;
46 import java.util.concurrent.ForkJoinPool;
47 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
48 import java.util.concurrent.ForkJoinWorkerThread;
49 import java.util.concurrent.atomic.AtomicInteger;
50 import java.util.logging.Level;
51 import java.util.logging.Logger;
52 
53 /**
54  * Implements the server-side contract for the load testing scenarios.
55  */
56 final class LoadServer {
57 
58   private static final Marshaller<ByteBuf> marshaller = new ByteBufOutputMarshaller();
59   /**
60    * Generic version of the unary method call.
61    */
62   static final MethodDescriptor<ByteBuf, ByteBuf> GENERIC_UNARY_METHOD =
63       BenchmarkServiceGrpc.getUnaryCallMethod().toBuilder(marshaller, marshaller)
64           .build();
65 
66   /**
67    * Generic version of the streaming ping-pong method call.
68    */
69   static final MethodDescriptor<ByteBuf, ByteBuf> GENERIC_STREAMING_PING_PONG_METHOD =
70       BenchmarkServiceGrpc.getStreamingCallMethod().toBuilder(marshaller, marshaller)
71           .build();
72 
73   private static final Logger log = Logger.getLogger(LoadServer.class.getName());
74 
75   private final Server server;
76   private final AsyncServer.BenchmarkServiceImpl benchmarkService;
77   private final OperatingSystemMXBean osBean;
78   private final int port;
79   private ByteBuf genericResponse;
80   private long lastStatTime;
81   private long lastMarkCpuTime;
82 
LoadServer(Control.ServerConfig config)83   LoadServer(Control.ServerConfig config) throws Exception {
84     log.log(Level.INFO, "Server Config \n" + config.toString());
85     port = config.getPort() ==  0 ? Utils.pickUnusedPort() : config.getPort();
86     ServerBuilder<?> serverBuilder = ServerBuilder.forPort(port);
87     int asyncThreads = config.getAsyncServerThreads() == 0
88         ? Runtime.getRuntime().availableProcessors()
89         : config.getAsyncServerThreads();
90     // The concepts of sync & async server are quite different in the C impl and the names
91     // chosen for the enum are based on that implementation. We use 'sync' to mean
92     // the direct executor case in Java even though the service implementations are always
93     // fully async.
94     switch (config.getServerType()) {
95       case ASYNC_SERVER: {
96         serverBuilder.executor(getExecutor(asyncThreads));
97         break;
98       }
99       case SYNC_SERVER: {
100         serverBuilder.directExecutor();
101         break;
102       }
103       case ASYNC_GENERIC_SERVER: {
104         serverBuilder.executor(getExecutor(asyncThreads));
105         // Create buffers for the generic service
106         PooledByteBufAllocator alloc = PooledByteBufAllocator.DEFAULT;
107         genericResponse = alloc.buffer(config.getPayloadConfig().getBytebufParams().getRespSize());
108         if (genericResponse.capacity() > 0) {
109           genericResponse.writerIndex(genericResponse.capacity() - 1);
110         }
111         break;
112       }
113       default: {
114         throw new IllegalArgumentException();
115       }
116     }
117     if (config.hasSecurityParams()) {
118       File cert = TestUtils.loadCert("server1.pem");
119       File key = TestUtils.loadCert("server1.key");
120       serverBuilder.useTransportSecurity(cert, key);
121     }
122     benchmarkService = new AsyncServer.BenchmarkServiceImpl();
123     if (config.getServerType() == Control.ServerType.ASYNC_GENERIC_SERVER) {
124       serverBuilder.addService(
125           ServerServiceDefinition
126               .builder(new ServiceDescriptor(BenchmarkServiceGrpc.SERVICE_NAME,
127                   GENERIC_STREAMING_PING_PONG_METHOD))
128               .addMethod(GENERIC_STREAMING_PING_PONG_METHOD, new GenericServiceCallHandler())
129               .build());
130     } else {
131       serverBuilder.addService(benchmarkService);
132     }
133     server = serverBuilder.build();
134 
135     List<OperatingSystemMXBean> beans =
136         ManagementFactory.getPlatformMXBeans(OperatingSystemMXBean.class);
137     if (!beans.isEmpty()) {
138       osBean = beans.get(0);
139     } else {
140       osBean = null;
141     }
142   }
143 
getExecutor(int asyncThreads)144   ExecutorService getExecutor(int asyncThreads) {
145     // TODO(carl-mastrangelo): This should not be necessary.  I don't know where this should be
146     // put.  Move it somewhere else, or remove it if no longer necessary.
147     // See: https://github.com/grpc/grpc-java/issues/2119
148     return new ForkJoinPool(asyncThreads,
149         new ForkJoinWorkerThreadFactory() {
150           final AtomicInteger num = new AtomicInteger();
151           @Override
152           public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
153             ForkJoinWorkerThread thread = defaultForkJoinWorkerThreadFactory.newThread(pool);
154             thread.setDaemon(true);
155             thread.setName("server-worker-" + "-" + num.getAndIncrement());
156             return thread;
157           }
158         }, UncaughtExceptionHandlers.systemExit(), true /* async */);
159   }
160 
getPort()161   int getPort() {
162     return port;
163   }
164 
getCores()165   int getCores() {
166     return Runtime.getRuntime().availableProcessors();
167   }
168 
start()169   void start() throws Exception {
170     server.start();
171     lastStatTime = System.nanoTime();
172     if (osBean != null) {
173       lastMarkCpuTime = osBean.getProcessCpuTime();
174     }
175   }
176 
getStats()177   Stats.ServerStats getStats() {
178     Stats.ServerStats.Builder builder = Stats.ServerStats.newBuilder();
179     long now = System.nanoTime();
180     double elapsed = ((double) now - lastStatTime) / 1000000000.0;
181     lastStatTime = now;
182     builder.setTimeElapsed(elapsed);
183     if (osBean != null) {
184       // Report all the CPU time as user-time  (which is intentionally incorrect)
185       long nowCpu = osBean.getProcessCpuTime();
186       builder.setTimeUser(((double) nowCpu - lastMarkCpuTime) / 1000000000.0);
187       lastMarkCpuTime = nowCpu;
188     }
189     return builder.build();
190   }
191 
shutdownNow()192   void shutdownNow() {
193     benchmarkService.shutdown();
194     server.shutdownNow();
195   }
196 
197   private class GenericServiceCallHandler implements ServerCallHandler<ByteBuf, ByteBuf> {
198 
199     @Override
startCall( final ServerCall<ByteBuf, ByteBuf> call, Metadata headers)200     public ServerCall.Listener<ByteBuf> startCall(
201         final ServerCall<ByteBuf, ByteBuf> call, Metadata headers) {
202       call.sendHeaders(new Metadata());
203       call.request(1);
204       return new ServerCall.Listener<ByteBuf>() {
205         @Override
206         public void onMessage(ByteBuf message) {
207           // no-op
208           message.release();
209           call.request(1);
210           call.sendMessage(genericResponse.slice());
211         }
212 
213         @Override
214         public void onHalfClose() {
215           call.close(Status.OK, new Metadata());
216         }
217 
218         @Override
219         public void onCancel() {
220         }
221 
222         @Override
223         public void onComplete() {
224         }
225       };
226     }
227   }
228 }
229