1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.okhttp;
18 
19 import com.google.common.base.Preconditions;
20 import io.grpc.internal.SerializingExecutor;
21 import io.grpc.okhttp.internal.framed.ErrorCode;
22 import io.grpc.okhttp.internal.framed.FrameWriter;
23 import io.grpc.okhttp.internal.framed.Header;
24 import io.grpc.okhttp.internal.framed.Settings;
25 import java.io.IOException;
26 import java.net.Socket;
27 import java.util.List;
28 import java.util.concurrent.atomic.AtomicLong;
29 import java.util.logging.Level;
30 import java.util.logging.Logger;
31 import okio.Buffer;
32 
33 class AsyncFrameWriter implements FrameWriter {
34   private static final Logger log = Logger.getLogger(OkHttpClientTransport.class.getName());
35   private FrameWriter frameWriter;
36   private Socket socket;
37   // Although writes are thread-safe, we serialize them to prevent consuming many Threads that are
38   // just waiting on each other.
39   private final SerializingExecutor executor;
40   private final TransportExceptionHandler transportExceptionHandler;
41   private final AtomicLong flushCounter = new AtomicLong();
42 
AsyncFrameWriter( TransportExceptionHandler transportExceptionHandler, SerializingExecutor executor)43   public AsyncFrameWriter(
44       TransportExceptionHandler transportExceptionHandler, SerializingExecutor executor) {
45     this.transportExceptionHandler = transportExceptionHandler;
46     this.executor = executor;
47   }
48 
49   /**
50    * Set the real frameWriter and the corresponding underlying socket, the socket is needed for
51    * closing.
52    *
53    * <p>should only be called by thread of executor.
54    */
becomeConnected(FrameWriter frameWriter, Socket socket)55   void becomeConnected(FrameWriter frameWriter, Socket socket) {
56     Preconditions.checkState(this.frameWriter == null,
57         "AsyncFrameWriter's setFrameWriter() should only be called once.");
58     this.frameWriter = Preconditions.checkNotNull(frameWriter, "frameWriter");
59     this.socket = Preconditions.checkNotNull(socket, "socket");
60   }
61 
62   @Override
connectionPreface()63   public void connectionPreface() {
64     executor.execute(new WriteRunnable() {
65       @Override
66       public void doRun() throws IOException {
67         frameWriter.connectionPreface();
68       }
69     });
70   }
71 
72   @Override
ackSettings(final Settings peerSettings)73   public void ackSettings(final Settings peerSettings) {
74     executor.execute(new WriteRunnable() {
75       @Override
76       public void doRun() throws IOException {
77         frameWriter.ackSettings(peerSettings);
78       }
79     });
80   }
81 
82   @Override
pushPromise(final int streamId, final int promisedStreamId, final List<Header> requestHeaders)83   public void pushPromise(final int streamId, final int promisedStreamId,
84       final List<Header> requestHeaders) {
85     executor.execute(new WriteRunnable() {
86       @Override
87       public void doRun() throws IOException {
88         frameWriter.pushPromise(streamId, promisedStreamId, requestHeaders);
89       }
90     });
91   }
92 
93   @Override
flush()94   public void flush() {
95     // keep track of version of flushes to skip flush if another flush task is queued.
96     final long flushCount = flushCounter.incrementAndGet();
97 
98     executor.execute(new WriteRunnable() {
99       @Override
100       public void doRun() throws IOException {
101         // There can be a flush starvation if there are continuous flood of flush is queued, this
102         // is not an issue with OkHttp since it flushes if the buffer is full.
103         if (flushCounter.get() == flushCount) {
104           frameWriter.flush();
105         }
106       }
107     });
108   }
109 
110   @Override
synStream(final boolean outFinished, final boolean inFinished, final int streamId, final int associatedStreamId, final List<Header> headerBlock)111   public void synStream(final boolean outFinished, final boolean inFinished, final int streamId,
112       final int associatedStreamId, final List<Header> headerBlock) {
113     executor.execute(new WriteRunnable() {
114       @Override
115       public void doRun() throws IOException {
116         frameWriter.synStream(outFinished, inFinished, streamId, associatedStreamId, headerBlock);
117       }
118     });
119   }
120 
121   @Override
synReply(final boolean outFinished, final int streamId, final List<Header> headerBlock)122   public void synReply(final boolean outFinished, final int streamId,
123       final List<Header> headerBlock) {
124     executor.execute(new WriteRunnable() {
125       @Override
126       public void doRun() throws IOException {
127         frameWriter.synReply(outFinished, streamId, headerBlock);
128       }
129     });
130   }
131 
132   @Override
headers(final int streamId, final List<Header> headerBlock)133   public void headers(final int streamId, final List<Header> headerBlock) {
134     executor.execute(new WriteRunnable() {
135       @Override
136       public void doRun() throws IOException {
137         frameWriter.headers(streamId, headerBlock);
138       }
139     });
140   }
141 
142   @Override
rstStream(final int streamId, final ErrorCode errorCode)143   public void rstStream(final int streamId, final ErrorCode errorCode) {
144     executor.execute(new WriteRunnable() {
145       @Override
146       public void doRun() throws IOException {
147         frameWriter.rstStream(streamId, errorCode);
148       }
149     });
150   }
151 
152   @Override
data(final boolean outFinished, final int streamId, final Buffer source, final int byteCount)153   public void data(final boolean outFinished, final int streamId, final Buffer source,
154       final int byteCount) {
155     executor.execute(new WriteRunnable() {
156       @Override
157       public void doRun() throws IOException {
158         frameWriter.data(outFinished, streamId, source, byteCount);
159       }
160     });
161   }
162 
163   @Override
settings(final Settings okHttpSettings)164   public void settings(final Settings okHttpSettings) {
165     executor.execute(new WriteRunnable() {
166       @Override
167       public void doRun() throws IOException {
168         frameWriter.settings(okHttpSettings);
169       }
170     });
171   }
172 
173   @Override
ping(final boolean ack, final int payload1, final int payload2)174   public void ping(final boolean ack, final int payload1, final int payload2) {
175     executor.execute(new WriteRunnable() {
176       @Override
177       public void doRun() throws IOException {
178         frameWriter.ping(ack, payload1, payload2);
179       }
180     });
181   }
182 
183   @Override
goAway(final int lastGoodStreamId, final ErrorCode errorCode, final byte[] debugData)184   public void goAway(final int lastGoodStreamId, final ErrorCode errorCode,
185       final byte[] debugData) {
186     executor.execute(new WriteRunnable() {
187       @Override
188       public void doRun() throws IOException {
189         frameWriter.goAway(lastGoodStreamId, errorCode, debugData);
190         // Flush it since after goAway, we are likely to close this writer.
191         frameWriter.flush();
192       }
193     });
194   }
195 
196   @Override
windowUpdate(final int streamId, final long windowSizeIncrement)197   public void windowUpdate(final int streamId, final long windowSizeIncrement) {
198     executor.execute(new WriteRunnable() {
199       @Override
200       public void doRun() throws IOException {
201         frameWriter.windowUpdate(streamId, windowSizeIncrement);
202       }
203     });
204   }
205 
206   @Override
close()207   public void close() {
208     executor.execute(new Runnable() {
209       @Override
210       public void run() {
211         if (frameWriter != null) {
212           try {
213             frameWriter.close();
214             socket.close();
215           } catch (IOException e) {
216             log.log(Level.WARNING, "Failed closing connection", e);
217           }
218         }
219       }
220     });
221   }
222 
223   private abstract class WriteRunnable implements Runnable {
224     @Override
run()225     public final void run() {
226       try {
227         if (frameWriter == null) {
228           throw new IOException("Unable to perform write due to unavailable frameWriter.");
229         }
230         doRun();
231       } catch (RuntimeException e) {
232         transportExceptionHandler.onException(e);
233       } catch (Exception e) {
234         transportExceptionHandler.onException(e);
235       }
236     }
237 
doRun()238     public abstract void doRun() throws IOException;
239   }
240 
241   @Override
maxDataLength()242   public int maxDataLength() {
243     return frameWriter == null ? 0x4000 /* 16384, the minimum required by the HTTP/2 spec */
244         : frameWriter.maxDataLength();
245   }
246 
247   /** A class that handles transport exception. */
248   interface TransportExceptionHandler {
249 
250     /** Handles exception. */
onException(Throwable throwable)251     void onException(Throwable throwable);
252   }
253 }
254