1 /*
2  * Copyright (C) 2014 Square, Inc. and others.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.squareup.okio.benchmarks;
18 
19 import java.io.EOFException;
20 import java.io.File;
21 import java.io.FileInputStream;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.util.concurrent.TimeUnit;
25 
26 import org.openjdk.jmh.annotations.Benchmark;
27 import org.openjdk.jmh.annotations.BenchmarkMode;
28 import org.openjdk.jmh.annotations.Fork;
29 import org.openjdk.jmh.annotations.Group;
30 import org.openjdk.jmh.annotations.GroupThreads;
31 import org.openjdk.jmh.annotations.Level;
32 import org.openjdk.jmh.annotations.Measurement;
33 import org.openjdk.jmh.annotations.Mode;
34 import org.openjdk.jmh.annotations.OutputTimeUnit;
35 import org.openjdk.jmh.annotations.Param;
36 import org.openjdk.jmh.annotations.Scope;
37 import org.openjdk.jmh.annotations.Setup;
38 import org.openjdk.jmh.annotations.State;
39 import org.openjdk.jmh.annotations.TearDown;
40 import org.openjdk.jmh.annotations.Threads;
41 import org.openjdk.jmh.annotations.Warmup;
42 
43 import okio.Buffer;
44 import okio.BufferedSource;
45 import okio.Okio;
46 import okio.Sink;
47 import okio.Timeout;
48 
49 import static java.util.Objects.requireNonNull;
50 
51 @Fork(1)
52 @Warmup(iterations = 10, time = 10)
53 @Measurement(iterations = 10, time = 10)
54 @State(Scope.Benchmark)
55 @BenchmarkMode(Mode.Throughput)
56 @OutputTimeUnit(TimeUnit.SECONDS)
57 public class BufferPerformanceBench {
58 
59   public static final File OriginPath =
60       new File(System.getProperty("okio.bench.origin.path", "/dev/urandom"));
61 
62   /* Test Workload
63    *
64    * Each benchmark thread maintains three buffers; a receive buffer, a process buffer
65    * and a send buffer. At every operation:
66    *
67    *   - We fill up the receive buffer using the origin, write the request to the process
68    *     buffer, and consume the process buffer.
69    *   - We fill up the process buffer using the origin, write the response to the send
70    *     buffer, and consume the send buffer.
71    *
72    * We use an "origin" source that serves as a preexisting sequence of bytes we can read
73    * from the file system. The request and response bytes are initialized in the beginning
74    * and reused throughout the benchmark in order to eliminate GC effects.
75    *
76    * Typically, we simulate the usage of small reads and large writes. Requests and
77    * responses are satisfied with precomputed buffers to eliminate GC effects on
78    * results.
79    *
80    * There are two types of benchmark tests; hot tests are "pedal to the metal" and
81    * use all CPU they can take. These are useful to magnify performance effects of
82    * changes but are not realistic use cases that should drive optimization efforts.
83    * Cold tests introduce think time between the receiving of the request and sending
84    * of the response. They are more useful as a reasonably realistic workload where
85    * buffers can be read from and written to during request/response handling but
86    * may hide subtle effects of most changes on performance. Prefer to look at the cold
87    * benchmarks first to decide if a bottleneck is worth pursuing, then use the hot
88    * benchmarks to fine tune optimization efforts.
89    *
90    * Benchmark threads do not explicitly communicate between each other (except to sync
91    * iterations as needed by JMH).
92    *
93    * We simulate think time for each benchmark thread by parking the thread for a
94    * configurable number of microseconds (1000 by default).
95    */
96 
97 
98   @Benchmark
99   @Threads(1)
threads1hot(HotBuffers buffers)100   public void threads1hot(HotBuffers buffers) throws IOException {
101     readWriteRecycle(buffers);
102   }
103 
104   @Benchmark
105   @Threads(2)
threads2hot(HotBuffers buffers)106   public void threads2hot(HotBuffers buffers) throws IOException {
107     readWriteRecycle(buffers);
108   }
109 
110   @Benchmark
111   @Threads(4)
threads4hot(HotBuffers buffers)112   public void threads4hot(HotBuffers buffers) throws IOException {
113     readWriteRecycle(buffers);
114   }
115 
116   @Benchmark
117   @Threads(8)
threads8hot(HotBuffers buffers)118   public void threads8hot(HotBuffers buffers) throws IOException {
119     readWriteRecycle(buffers);
120   }
121 
122   @Benchmark
123   @Threads(16)
threads16hot(HotBuffers buffers)124   public void threads16hot(HotBuffers buffers) throws IOException {
125     readWriteRecycle(buffers);
126   }
127 
128   @Benchmark
129   @Threads(32)
threads32hot(HotBuffers buffers)130   public void threads32hot(HotBuffers buffers) throws IOException {
131     readWriteRecycle(buffers);
132   }
133 
134   @Benchmark
135   @GroupThreads(1)
136   @Group("cold")
thinkReadHot(HotBuffers buffers)137   public void thinkReadHot(HotBuffers buffers) throws IOException {
138     buffers.receive(requestBytes).readAll(NullSink);
139   }
140 
141   @Benchmark
142   @GroupThreads(3)
143   @Group("cold")
thinkWriteCold(ColdBuffers buffers)144   public void thinkWriteCold(ColdBuffers buffers) throws IOException {
145     buffers.transmit(responseBytes).readAll(NullSink);
146   }
147 
readWriteRecycle(HotBuffers buffers)148   private void readWriteRecycle(HotBuffers buffers) throws IOException {
149     buffers.receive(requestBytes).readAll(NullSink);
150     buffers.transmit(responseBytes).readAll(NullSink);
151   }
152 
153   @Param({ "1000" })
154   int maxThinkMicros = 1000;
155 
156   @Param({ "1024" })
157   int maxReadBytes = 1024;
158 
159   @Param({ "1024" })
160   int maxWriteBytes = 1024;
161 
162   @Param({ "2048" })
163   int requestSize = 2048;
164 
165   @Param({ "1" })
166   int responseFactor = 1;
167 
168   byte[] requestBytes;
169 
170   byte[] responseBytes;
171 
172   @Setup(Level.Trial)
storeRequestResponseData()173   public void storeRequestResponseData() throws IOException {
174     checkOrigin(OriginPath);
175 
176     requestBytes = storeSourceData(new byte[requestSize]);
177     responseBytes = storeSourceData(new byte[requestSize * responseFactor]);
178   }
179 
storeSourceData(byte[] dest)180   private byte[] storeSourceData(byte[] dest) throws IOException {
181     requireNonNull(dest, "dest == null");
182     try (BufferedSource source = Okio.buffer(Okio.source(OriginPath))) {
183       source.readFully(dest);
184     }
185     return dest;
186   }
187 
checkOrigin(File path)188   private void checkOrigin(File path) throws IOException {
189     requireNonNull(path, "path == null");
190 
191     if (!path.canRead()) {
192       throw new IllegalArgumentException("can not access: " + path);
193     }
194 
195     try (InputStream in = new FileInputStream(path)) {
196       int available = in.read();
197       if (available < 0) {
198         throw new IllegalArgumentException("can not read: " + path);
199       }
200     }
201   }
202 
203   /*
204    * The state class hierarchy is larger than it needs to be due to a JMH
205    * issue where states inheriting setup methods depending on another state
206    * do not get initialized correctly from benchmark methods making use
207    * of groups. To work around, we leave the common setup and teardown code
208    * in superclasses and move the setup method depending on the bench state
209    * to subclasses. Without the workaround, it would have been enough for
210    * `ColdBuffers` to inherit from `HotBuffers`.
211    */
212 
213   @State(Scope.Thread)
214   public static class ColdBuffers extends BufferSetup {
215 
216     @Setup(Level.Trial)
setupBench(BufferPerformanceBench bench)217     public void setupBench(BufferPerformanceBench bench) {
218       super.bench = bench;
219     }
220 
221     @Setup(Level.Invocation)
lag()222     public void lag() throws InterruptedException {
223       TimeUnit.MICROSECONDS.sleep(bench.maxThinkMicros);
224     }
225 
226   }
227 
228   @State(Scope.Thread)
229   public static class HotBuffers extends BufferSetup {
230 
231     @Setup(Level.Trial)
setupBench(BufferPerformanceBench bench)232     public void setupBench(BufferPerformanceBench bench) {
233       super.bench = bench;
234     }
235 
236   }
237 
238   @State(Scope.Thread)
239   public static abstract class BufferSetup extends BufferState {
240     BufferPerformanceBench bench;
241 
receive(byte[] bytes)242     public BufferedSource receive(byte[] bytes) throws IOException {
243       return super.receive(bytes, bench.maxReadBytes);
244     }
245 
transmit(byte[] bytes)246     public BufferedSource transmit(byte[] bytes) throws IOException {
247       return super.transmit(bytes, bench.maxWriteBytes);
248     }
249 
250     @TearDown
dispose()251     public void dispose() throws IOException {
252       releaseBuffers();
253     }
254 
255   }
256 
257   public static class BufferState {
258 
259     @SuppressWarnings("resource")
260     final Buffer received = new Buffer();
261     @SuppressWarnings("resource")
262     final Buffer sent = new Buffer();
263     @SuppressWarnings("resource")
264     final Buffer process = new Buffer();
265 
releaseBuffers()266     public void releaseBuffers() throws IOException {
267       received.clear();
268       sent.clear();
269       process.clear();
270     }
271 
272     /**
273      * Fills up the receive buffer, hands off to process buffer and returns it for consuming.
274      * Expects receive and process buffers to be empty. Leaves the receive buffer empty and
275      * process buffer full.
276      */
receive(byte[] bytes, int maxChunkSize)277     protected Buffer receive(byte[] bytes, int maxChunkSize) throws IOException {
278       writeChunked(received, bytes, maxChunkSize).readAll(process);
279       return process;
280     }
281 
282     /**
283      * Fills up the process buffer, hands off to send buffer and returns it for consuming.
284      * Expects process and sent buffers to be empty. Leaves the process buffer empty and
285      * sent buffer full.
286      */
transmit(byte[] bytes, int maxChunkSize)287     protected BufferedSource transmit(byte[] bytes, int maxChunkSize) throws IOException {
288       writeChunked(process, bytes, maxChunkSize).readAll(sent);
289       return sent;
290     }
291 
writeChunked(Buffer buffer, byte[] bytes, final int chunkSize)292     private BufferedSource writeChunked(Buffer buffer, byte[] bytes, final int chunkSize) {
293       int remaining = bytes.length;
294       int offset = 0;
295       while (remaining > 0) {
296         int bytesToWrite = Math.min(remaining, chunkSize);
297         buffer.write(bytes, offset, bytesToWrite);
298         remaining -= bytesToWrite;
299         offset += bytesToWrite;
300       }
301       return buffer;
302     }
303 
304   }
305 
306   @SuppressWarnings("resource")
307   private static final Sink NullSink = new Sink() {
308 
309     @Override public void write(Buffer source, long byteCount) throws EOFException {
310       source.skip(byteCount);
311     }
312 
313     @Override public void flush() {
314       // nothing
315     }
316 
317     @Override public Timeout timeout() {
318       return Timeout.NONE;
319     }
320 
321     @Override public void close() {
322       // nothing
323     }
324 
325     @Override public String toString() {
326       return "NullSink{}";
327     }
328   };
329 
330 }
331