1 /*
2  * Copyright 2016 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 
21 import com.google.common.annotations.VisibleForTesting;
22 import com.google.common.base.MoreObjects;
23 import com.google.common.base.Preconditions;
24 import com.google.common.util.concurrent.ListenableFuture;
25 import com.google.common.util.concurrent.SettableFuture;
26 import io.grpc.Attributes;
27 import io.grpc.CallOptions;
28 import io.grpc.ClientCall;
29 import io.grpc.ConnectivityState;
30 import io.grpc.ConnectivityStateInfo;
31 import io.grpc.Context;
32 import io.grpc.EquivalentAddressGroup;
33 import io.grpc.InternalChannelz;
34 import io.grpc.InternalChannelz.ChannelStats;
35 import io.grpc.InternalChannelz.ChannelTrace;
36 import io.grpc.InternalInstrumented;
37 import io.grpc.InternalLogId;
38 import io.grpc.InternalWithLogId;
39 import io.grpc.LoadBalancer;
40 import io.grpc.LoadBalancer.PickResult;
41 import io.grpc.LoadBalancer.PickSubchannelArgs;
42 import io.grpc.LoadBalancer.Subchannel;
43 import io.grpc.LoadBalancer.SubchannelPicker;
44 import io.grpc.ManagedChannel;
45 import io.grpc.Metadata;
46 import io.grpc.MethodDescriptor;
47 import io.grpc.Status;
48 import io.grpc.internal.ClientCallImpl.ClientTransportProvider;
49 import java.util.Collections;
50 import java.util.List;
51 import java.util.concurrent.CountDownLatch;
52 import java.util.concurrent.Executor;
53 import java.util.concurrent.ScheduledExecutorService;
54 import java.util.concurrent.TimeUnit;
55 import java.util.logging.Level;
56 import java.util.logging.Logger;
57 import javax.annotation.CheckForNull;
58 import javax.annotation.Nullable;
59 import javax.annotation.concurrent.ThreadSafe;
60 
61 /**
62  * A ManagedChannel backed by a single {@link InternalSubchannel} and used for {@link LoadBalancer}
63  * to its own RPC needs.
64  */
65 @ThreadSafe
66 final class OobChannel extends ManagedChannel implements InternalInstrumented<ChannelStats> {
67   private static final Logger log = Logger.getLogger(OobChannel.class.getName());
68 
69   private InternalSubchannel subchannel;
70   private AbstractSubchannel subchannelImpl;
71   private SubchannelPicker subchannelPicker;
72 
73   private final InternalLogId logId = InternalLogId.allocate(getClass().getName());
74   private final String authority;
75   private final DelayedClientTransport delayedTransport;
76   private final InternalChannelz channelz;
77   private final ObjectPool<? extends Executor> executorPool;
78   private final Executor executor;
79   private final ScheduledExecutorService deadlineCancellationExecutor;
80   private final CountDownLatch terminatedLatch = new CountDownLatch(1);
81   private volatile boolean shutdown;
82   private final CallTracer channelCallsTracer;
83   @CheckForNull
84   private final ChannelTracer channelTracer;
85   private final TimeProvider timeProvider;
86 
87   private final ClientTransportProvider transportProvider = new ClientTransportProvider() {
88     @Override
89     public ClientTransport get(PickSubchannelArgs args) {
90       // delayed transport's newStream() always acquires a lock, but concurrent performance doesn't
91       // matter here because OOB communication should be sparse, and it's not on application RPC's
92       // critical path.
93       return delayedTransport;
94     }
95 
96     @Override
97     public <ReqT> RetriableStream<ReqT> newRetriableStream(MethodDescriptor<ReqT, ?> method,
98         CallOptions callOptions, Metadata headers, Context context) {
99       throw new UnsupportedOperationException("OobChannel should not create retriable streams");
100     }
101   };
102 
OobChannel( String authority, ObjectPool<? extends Executor> executorPool, ScheduledExecutorService deadlineCancellationExecutor, ChannelExecutor channelExecutor, CallTracer callsTracer, @Nullable ChannelTracer channelTracer, InternalChannelz channelz, TimeProvider timeProvider)103   OobChannel(
104       String authority, ObjectPool<? extends Executor> executorPool,
105       ScheduledExecutorService deadlineCancellationExecutor, ChannelExecutor channelExecutor,
106       CallTracer callsTracer, @Nullable  ChannelTracer channelTracer, InternalChannelz channelz,
107       TimeProvider timeProvider) {
108     this.authority = checkNotNull(authority, "authority");
109     this.executorPool = checkNotNull(executorPool, "executorPool");
110     this.executor = checkNotNull(executorPool.getObject(), "executor");
111     this.deadlineCancellationExecutor = checkNotNull(
112         deadlineCancellationExecutor, "deadlineCancellationExecutor");
113     this.delayedTransport = new DelayedClientTransport(executor, channelExecutor);
114     this.channelz = Preconditions.checkNotNull(channelz);
115     this.delayedTransport.start(new ManagedClientTransport.Listener() {
116         @Override
117         public void transportShutdown(Status s) {
118           // Don't care
119         }
120 
121         @Override
122         public void transportTerminated() {
123           subchannelImpl.shutdown();
124         }
125 
126         @Override
127         public void transportReady() {
128           // Don't care
129         }
130 
131         @Override
132         public void transportInUse(boolean inUse) {
133           // Don't care
134         }
135       });
136     this.channelCallsTracer = callsTracer;
137     this.channelTracer = channelTracer;
138     this.timeProvider = timeProvider;
139   }
140 
141   // Must be called only once, right after the OobChannel is created.
setSubchannel(final InternalSubchannel subchannel)142   void setSubchannel(final InternalSubchannel subchannel) {
143     log.log(Level.FINE, "[{0}] Created with [{1}]", new Object[] {this, subchannel});
144     this.subchannel = subchannel;
145     subchannelImpl = new AbstractSubchannel() {
146         @Override
147         public void shutdown() {
148           subchannel.shutdown(Status.UNAVAILABLE.withDescription("OobChannel is shutdown"));
149         }
150 
151         @Override
152         ClientTransport obtainActiveTransport() {
153           return subchannel.obtainActiveTransport();
154         }
155 
156         @Override
157         InternalInstrumented<ChannelStats> getInternalSubchannel() {
158           return subchannel;
159         }
160 
161         @Override
162         public void requestConnection() {
163           subchannel.obtainActiveTransport();
164         }
165 
166         @Override
167         public List<EquivalentAddressGroup> getAllAddresses() {
168           return subchannel.getAddressGroups();
169         }
170 
171         @Override
172         public Attributes getAttributes() {
173           return Attributes.EMPTY;
174         }
175     };
176 
177     subchannelPicker = new SubchannelPicker() {
178         final PickResult result = PickResult.withSubchannel(subchannelImpl);
179 
180         @Override
181         public PickResult pickSubchannel(PickSubchannelArgs args) {
182           return result;
183         }
184       };
185     delayedTransport.reprocess(subchannelPicker);
186   }
187 
updateAddresses(EquivalentAddressGroup eag)188   void updateAddresses(EquivalentAddressGroup eag) {
189     subchannel.updateAddresses(Collections.singletonList(eag));
190   }
191 
192   @Override
newCall( MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions)193   public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
194       MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
195     return new ClientCallImpl<RequestT, ResponseT>(methodDescriptor,
196         callOptions.getExecutor() == null ? executor : callOptions.getExecutor(),
197         callOptions, transportProvider, deadlineCancellationExecutor, channelCallsTracer,
198         false /* retryEnabled */);
199   }
200 
201   @Override
authority()202   public String authority() {
203     return authority;
204   }
205 
206   @Override
isTerminated()207   public boolean isTerminated() {
208     return terminatedLatch.getCount() == 0;
209   }
210 
211   @Override
awaitTermination(long time, TimeUnit unit)212   public boolean awaitTermination(long time, TimeUnit unit) throws InterruptedException {
213     return terminatedLatch.await(time, unit);
214   }
215 
216   @Override
getState(boolean requestConnectionIgnored)217   public ConnectivityState getState(boolean requestConnectionIgnored) {
218     if (subchannel == null) {
219       return ConnectivityState.IDLE;
220     }
221     return subchannel.getState();
222   }
223 
224   @Override
shutdown()225   public ManagedChannel shutdown() {
226     shutdown = true;
227     delayedTransport.shutdown(Status.UNAVAILABLE.withDescription("OobChannel.shutdown() called"));
228     return this;
229   }
230 
231   @Override
isShutdown()232   public boolean isShutdown() {
233     return shutdown;
234   }
235 
236   @Override
shutdownNow()237   public ManagedChannel shutdownNow() {
238     shutdown = true;
239     delayedTransport.shutdownNow(
240         Status.UNAVAILABLE.withDescription("OobChannel.shutdownNow() called"));
241     return this;
242   }
243 
handleSubchannelStateChange(final ConnectivityStateInfo newState)244   void handleSubchannelStateChange(final ConnectivityStateInfo newState) {
245     if (channelTracer != null) {
246       channelTracer.reportEvent(
247           new ChannelTrace.Event.Builder()
248               .setDescription("Entering " + newState.getState() + " state")
249               .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
250               .setTimestampNanos(timeProvider.currentTimeNanos())
251               .build());
252     }
253     switch (newState.getState()) {
254       case READY:
255       case IDLE:
256         delayedTransport.reprocess(subchannelPicker);
257         break;
258       case TRANSIENT_FAILURE:
259         delayedTransport.reprocess(new SubchannelPicker() {
260             final PickResult errorResult = PickResult.withError(newState.getStatus());
261 
262             @Override
263             public PickResult pickSubchannel(PickSubchannelArgs args) {
264               return errorResult;
265             }
266           });
267         break;
268       default:
269         // Do nothing
270     }
271   }
272 
273   // must be run from channel executor
handleSubchannelTerminated()274   void handleSubchannelTerminated() {
275     channelz.removeSubchannel(this);
276     // When delayedTransport is terminated, it shuts down subchannel.  Therefore, at this point
277     // both delayedTransport and subchannel have terminated.
278     executorPool.returnObject(executor);
279     terminatedLatch.countDown();
280   }
281 
282   @VisibleForTesting
getSubchannel()283   Subchannel getSubchannel() {
284     return subchannelImpl;
285   }
286 
getInternalSubchannel()287   InternalSubchannel getInternalSubchannel() {
288     return subchannel;
289   }
290 
291   @Override
getStats()292   public ListenableFuture<ChannelStats> getStats() {
293     final SettableFuture<ChannelStats> ret = SettableFuture.create();
294     final ChannelStats.Builder builder = new ChannelStats.Builder();
295     channelCallsTracer.updateBuilder(builder);
296     if (channelTracer != null) {
297       channelTracer.updateBuilder(builder);
298     }
299     builder
300         .setTarget(authority)
301         .setState(subchannel.getState())
302         .setSubchannels(Collections.<InternalWithLogId>singletonList(subchannel));
303     ret.set(builder.build());
304     return ret;
305   }
306 
307   @Override
getLogId()308   public InternalLogId getLogId() {
309     return logId;
310   }
311 
312   @Override
toString()313   public String toString() {
314     return MoreObjects.toStringHelper(this)
315         .add("logId", logId.getId())
316         .add("authority", authority)
317         .toString();
318   }
319 
320   @Override
resetConnectBackoff()321   public void resetConnectBackoff() {
322     subchannel.resetConnectBackoff();
323   }
324 }
325