1 /*
2  * Copyright 2015 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.benchmarks;
18 
19 import static java.util.concurrent.ForkJoinPool.defaultForkJoinWorkerThreadFactory;
20 
21 import com.google.common.util.concurrent.UncaughtExceptionHandlers;
22 import com.google.protobuf.ByteString;
23 import io.grpc.ManagedChannel;
24 import io.grpc.ManagedChannelBuilder;
25 import io.grpc.Status;
26 import io.grpc.benchmarks.proto.Messages;
27 import io.grpc.benchmarks.proto.Messages.Payload;
28 import io.grpc.benchmarks.proto.Messages.SimpleRequest;
29 import io.grpc.benchmarks.proto.Messages.SimpleResponse;
30 import io.grpc.internal.testing.TestUtils;
31 import io.grpc.netty.GrpcSslContexts;
32 import io.grpc.netty.NettyChannelBuilder;
33 import io.grpc.okhttp.OkHttpChannelBuilder;
34 import io.grpc.okhttp.internal.Platform;
35 import io.netty.channel.epoll.EpollDomainSocketChannel;
36 import io.netty.channel.epoll.EpollEventLoopGroup;
37 import io.netty.channel.epoll.EpollSocketChannel;
38 import io.netty.channel.nio.NioEventLoopGroup;
39 import io.netty.channel.socket.nio.NioSocketChannel;
40 import io.netty.channel.unix.DomainSocketAddress;
41 import io.netty.util.concurrent.DefaultThreadFactory;
42 import java.io.File;
43 import java.io.FileOutputStream;
44 import java.io.IOException;
45 import java.io.PrintStream;
46 import java.net.InetSocketAddress;
47 import java.net.ServerSocket;
48 import java.net.SocketAddress;
49 import java.util.concurrent.ExecutorService;
50 import java.util.concurrent.ForkJoinPool;
51 import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
52 import java.util.concurrent.ForkJoinWorkerThread;
53 import java.util.concurrent.atomic.AtomicInteger;
54 import javax.annotation.Nullable;
55 import org.HdrHistogram.Histogram;
56 
57 /**
58  * Utility methods to support benchmarking classes.
59  */
60 public final class Utils {
61   private static final String UNIX_DOMAIN_SOCKET_PREFIX = "unix://";
62 
63   // The histogram can record values between 1 microsecond and 1 min.
64   public static final long HISTOGRAM_MAX_VALUE = 60000000L;
65 
66   // Value quantization will be no more than 1%. See the README of HdrHistogram for more details.
67   public static final int HISTOGRAM_PRECISION = 2;
68 
69   public static final int DEFAULT_FLOW_CONTROL_WINDOW =
70       NettyChannelBuilder.DEFAULT_FLOW_CONTROL_WINDOW;
71 
Utils()72   private Utils() {
73   }
74 
parseBoolean(String value)75   public static boolean parseBoolean(String value) {
76     return value.isEmpty() || Boolean.parseBoolean(value);
77   }
78 
79   /**
80    * Parse a {@link SocketAddress} from the given string.
81    */
parseSocketAddress(String value)82   public static SocketAddress parseSocketAddress(String value) {
83     if (value.startsWith(UNIX_DOMAIN_SOCKET_PREFIX)) {
84       // Unix Domain Socket address.
85       // Create the underlying file for the Unix Domain Socket.
86       String filePath = value.substring(UNIX_DOMAIN_SOCKET_PREFIX.length());
87       File file = new File(filePath);
88       if (!file.isAbsolute()) {
89         throw new IllegalArgumentException("File path must be absolute: " + filePath);
90       }
91       try {
92         if (file.createNewFile()) {
93           // If this application created the file, delete it when the application exits.
94           file.deleteOnExit();
95         }
96       } catch (IOException ex) {
97         throw new RuntimeException(ex);
98       }
99       // Create the SocketAddress referencing the file.
100       return new DomainSocketAddress(file);
101     } else {
102       // Standard TCP/IP address.
103       String[] parts = value.split(":", 2);
104       if (parts.length < 2) {
105         throw new IllegalArgumentException(
106             "Address must be a unix:// path or be in the form host:port. Got: " + value);
107       }
108       String host = parts[0];
109       int port = Integer.parseInt(parts[1]);
110       return new InetSocketAddress(host, port);
111     }
112   }
113 
newOkHttpClientChannel( SocketAddress address, boolean tls, boolean testca)114   private static OkHttpChannelBuilder newOkHttpClientChannel(
115       SocketAddress address, boolean tls, boolean testca) {
116     InetSocketAddress addr = (InetSocketAddress) address;
117     OkHttpChannelBuilder builder =
118         OkHttpChannelBuilder.forAddress(addr.getHostName(), addr.getPort());
119     if (!tls) {
120       builder.usePlaintext();
121     } else if (testca) {
122       try {
123         builder.sslSocketFactory(TestUtils.newSslSocketFactoryForCa(
124             Platform.get().getProvider(),
125             TestUtils.loadCert("ca.pem")));
126       } catch (Exception e) {
127         throw new RuntimeException(e);
128       }
129     }
130     return builder;
131   }
132 
newNettyClientChannel(Transport transport, SocketAddress address, boolean tls, boolean testca, int flowControlWindow)133   private static NettyChannelBuilder newNettyClientChannel(Transport transport,
134       SocketAddress address, boolean tls, boolean testca, int flowControlWindow)
135       throws IOException {
136     NettyChannelBuilder builder =
137         NettyChannelBuilder.forAddress(address).flowControlWindow(flowControlWindow);
138     if (!tls) {
139       builder.usePlaintext();
140     } else if (testca) {
141       File cert = TestUtils.loadCert("ca.pem");
142       builder.sslContext(GrpcSslContexts.forClient().trustManager(cert).build());
143     }
144 
145     DefaultThreadFactory tf = new DefaultThreadFactory("client-elg-", true /*daemon */);
146     switch (transport) {
147       case NETTY_NIO:
148         builder
149             .eventLoopGroup(new NioEventLoopGroup(0, tf))
150             .channelType(NioSocketChannel.class);
151         break;
152 
153       case NETTY_EPOLL:
154         // These classes only work on Linux.
155         builder
156             .eventLoopGroup(new EpollEventLoopGroup(0, tf))
157             .channelType(EpollSocketChannel.class);
158         break;
159 
160       case NETTY_UNIX_DOMAIN_SOCKET:
161         // These classes only work on Linux.
162         builder
163             .eventLoopGroup(new EpollEventLoopGroup(0, tf))
164             .channelType(EpollDomainSocketChannel.class);
165         break;
166 
167       default:
168         // Should never get here.
169         throw new IllegalArgumentException("Unsupported transport: " + transport);
170     }
171     return builder;
172   }
173 
174   private static ExecutorService clientExecutor;
175 
getExecutor()176   private static synchronized ExecutorService getExecutor() {
177     if (clientExecutor == null) {
178       clientExecutor = new ForkJoinPool(
179           Runtime.getRuntime().availableProcessors(),
180           new ForkJoinWorkerThreadFactory() {
181             final AtomicInteger num = new AtomicInteger();
182             @Override
183             public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
184               ForkJoinWorkerThread thread = defaultForkJoinWorkerThreadFactory.newThread(pool);
185               thread.setDaemon(true);
186               thread.setName("grpc-client-app-" + "-" + num.getAndIncrement());
187               return thread;
188             }
189           }, UncaughtExceptionHandlers.systemExit(), true /* async */);
190     }
191     return clientExecutor;
192   }
193 
194   /**
195    * Create a {@link ManagedChannel} for the given parameters.
196    */
newClientChannel(Transport transport, SocketAddress address, boolean tls, boolean testca, @Nullable String authorityOverride, int flowControlWindow, boolean directExecutor)197   public static ManagedChannel newClientChannel(Transport transport, SocketAddress address,
198         boolean tls, boolean testca, @Nullable String authorityOverride,
199         int flowControlWindow, boolean directExecutor) {
200     ManagedChannelBuilder<?> builder;
201     if (transport == Transport.OK_HTTP) {
202       builder = newOkHttpClientChannel(address, tls, testca);
203     } else {
204       try {
205         builder = newNettyClientChannel(transport, address, tls, testca, flowControlWindow);
206       } catch (Exception e) {
207         throw new RuntimeException(e);
208       }
209     }
210     if (authorityOverride != null) {
211       builder.overrideAuthority(authorityOverride);
212     }
213 
214     if (directExecutor) {
215       builder.directExecutor();
216     } else {
217       // TODO(carl-mastrangelo): This should not be necessary.  I don't know where this should be
218       // put.  Move it somewhere else, or remove it if no longer necessary.
219       // See: https://github.com/grpc/grpc-java/issues/2119
220       builder.executor(getExecutor());
221     }
222 
223     return builder.build();
224   }
225 
226   /**
227    * Save a {@link Histogram} to a file.
228    */
saveHistogram(Histogram histogram, String filename)229   public static void saveHistogram(Histogram histogram, String filename) throws IOException {
230     File file;
231     PrintStream log = null;
232     try {
233       file = new File(filename);
234       if (file.exists() && !file.delete()) {
235         System.err.println("Failed deleting previous histogram file: " + file.getAbsolutePath());
236       }
237       log = new PrintStream(new FileOutputStream(file), false);
238       histogram.outputPercentileDistribution(log, 1.0);
239     } finally {
240       if (log != null) {
241         log.close();
242       }
243     }
244   }
245 
246   /**
247    * Construct a {@link SimpleResponse} for the given request.
248    */
makeResponse(SimpleRequest request)249   public static SimpleResponse makeResponse(SimpleRequest request) {
250     if (request.getResponseSize() > 0) {
251       if (!Messages.PayloadType.COMPRESSABLE.equals(request.getResponseType())) {
252         throw Status.INTERNAL.augmentDescription("Error creating payload.").asRuntimeException();
253       }
254 
255       ByteString body = ByteString.copyFrom(new byte[request.getResponseSize()]);
256       Messages.PayloadType type = request.getResponseType();
257 
258       Payload payload = Payload.newBuilder().setType(type).setBody(body).build();
259       return SimpleResponse.newBuilder().setPayload(payload).build();
260     }
261     return SimpleResponse.getDefaultInstance();
262   }
263 
264   /**
265    * Construct a {@link SimpleRequest} with the specified dimensions.
266    */
makeRequest(Messages.PayloadType payloadType, int reqLength, int respLength)267   public static SimpleRequest makeRequest(Messages.PayloadType payloadType, int reqLength,
268                                           int respLength) {
269     ByteString body = ByteString.copyFrom(new byte[reqLength]);
270     Payload payload = Payload.newBuilder()
271         .setType(payloadType)
272         .setBody(body)
273         .build();
274 
275     return SimpleRequest.newBuilder()
276         .setResponseType(payloadType)
277         .setResponseSize(respLength)
278         .setPayload(payload)
279         .build();
280   }
281 
282   /**
283    * Picks a port that is not used right at this moment.
284    * Warning: Not thread safe. May see "BindException: Address already in use: bind" if using the
285    * returned port to create a new server socket when other threads/processes are concurrently
286    * creating new sockets without a specific port.
287    */
pickUnusedPort()288   public static int pickUnusedPort() {
289     try {
290       ServerSocket serverSocket = new ServerSocket(0);
291       int port = serverSocket.getLocalPort();
292       serverSocket.close();
293       return port;
294     } catch (IOException e) {
295       throw new RuntimeException(e);
296     }
297   }
298 }
299