1 /* 2 * Copyright 2016 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 21 import com.google.common.annotations.VisibleForTesting; 22 import com.google.common.base.MoreObjects; 23 import com.google.common.base.Preconditions; 24 import com.google.common.util.concurrent.ListenableFuture; 25 import com.google.common.util.concurrent.SettableFuture; 26 import io.grpc.Attributes; 27 import io.grpc.CallOptions; 28 import io.grpc.ClientCall; 29 import io.grpc.ConnectivityState; 30 import io.grpc.ConnectivityStateInfo; 31 import io.grpc.Context; 32 import io.grpc.EquivalentAddressGroup; 33 import io.grpc.InternalChannelz; 34 import io.grpc.InternalChannelz.ChannelStats; 35 import io.grpc.InternalChannelz.ChannelTrace; 36 import io.grpc.InternalInstrumented; 37 import io.grpc.InternalLogId; 38 import io.grpc.InternalWithLogId; 39 import io.grpc.LoadBalancer; 40 import io.grpc.LoadBalancer.PickResult; 41 import io.grpc.LoadBalancer.PickSubchannelArgs; 42 import io.grpc.LoadBalancer.Subchannel; 43 import io.grpc.LoadBalancer.SubchannelPicker; 44 import io.grpc.ManagedChannel; 45 import io.grpc.Metadata; 46 import io.grpc.MethodDescriptor; 47 import io.grpc.Status; 48 import io.grpc.internal.ClientCallImpl.ClientTransportProvider; 49 import java.util.Collections; 50 import java.util.List; 51 import java.util.concurrent.CountDownLatch; 52 import java.util.concurrent.Executor; 53 import java.util.concurrent.ScheduledExecutorService; 54 import java.util.concurrent.TimeUnit; 55 import java.util.logging.Level; 56 import java.util.logging.Logger; 57 import javax.annotation.CheckForNull; 58 import javax.annotation.Nullable; 59 import javax.annotation.concurrent.ThreadSafe; 60 61 /** 62 * A ManagedChannel backed by a single {@link InternalSubchannel} and used for {@link LoadBalancer} 63 * to its own RPC needs. 64 */ 65 @ThreadSafe 66 final class OobChannel extends ManagedChannel implements InternalInstrumented<ChannelStats> { 67 private static final Logger log = Logger.getLogger(OobChannel.class.getName()); 68 69 private InternalSubchannel subchannel; 70 private AbstractSubchannel subchannelImpl; 71 private SubchannelPicker subchannelPicker; 72 73 private final InternalLogId logId = InternalLogId.allocate(getClass().getName()); 74 private final String authority; 75 private final DelayedClientTransport delayedTransport; 76 private final InternalChannelz channelz; 77 private final ObjectPool<? extends Executor> executorPool; 78 private final Executor executor; 79 private final ScheduledExecutorService deadlineCancellationExecutor; 80 private final CountDownLatch terminatedLatch = new CountDownLatch(1); 81 private volatile boolean shutdown; 82 private final CallTracer channelCallsTracer; 83 @CheckForNull 84 private final ChannelTracer channelTracer; 85 private final TimeProvider timeProvider; 86 87 private final ClientTransportProvider transportProvider = new ClientTransportProvider() { 88 @Override 89 public ClientTransport get(PickSubchannelArgs args) { 90 // delayed transport's newStream() always acquires a lock, but concurrent performance doesn't 91 // matter here because OOB communication should be sparse, and it's not on application RPC's 92 // critical path. 93 return delayedTransport; 94 } 95 96 @Override 97 public <ReqT> RetriableStream<ReqT> newRetriableStream(MethodDescriptor<ReqT, ?> method, 98 CallOptions callOptions, Metadata headers, Context context) { 99 throw new UnsupportedOperationException("OobChannel should not create retriable streams"); 100 } 101 }; 102 OobChannel( String authority, ObjectPool<? extends Executor> executorPool, ScheduledExecutorService deadlineCancellationExecutor, ChannelExecutor channelExecutor, CallTracer callsTracer, @Nullable ChannelTracer channelTracer, InternalChannelz channelz, TimeProvider timeProvider)103 OobChannel( 104 String authority, ObjectPool<? extends Executor> executorPool, 105 ScheduledExecutorService deadlineCancellationExecutor, ChannelExecutor channelExecutor, 106 CallTracer callsTracer, @Nullable ChannelTracer channelTracer, InternalChannelz channelz, 107 TimeProvider timeProvider) { 108 this.authority = checkNotNull(authority, "authority"); 109 this.executorPool = checkNotNull(executorPool, "executorPool"); 110 this.executor = checkNotNull(executorPool.getObject(), "executor"); 111 this.deadlineCancellationExecutor = checkNotNull( 112 deadlineCancellationExecutor, "deadlineCancellationExecutor"); 113 this.delayedTransport = new DelayedClientTransport(executor, channelExecutor); 114 this.channelz = Preconditions.checkNotNull(channelz); 115 this.delayedTransport.start(new ManagedClientTransport.Listener() { 116 @Override 117 public void transportShutdown(Status s) { 118 // Don't care 119 } 120 121 @Override 122 public void transportTerminated() { 123 subchannelImpl.shutdown(); 124 } 125 126 @Override 127 public void transportReady() { 128 // Don't care 129 } 130 131 @Override 132 public void transportInUse(boolean inUse) { 133 // Don't care 134 } 135 }); 136 this.channelCallsTracer = callsTracer; 137 this.channelTracer = channelTracer; 138 this.timeProvider = timeProvider; 139 } 140 141 // Must be called only once, right after the OobChannel is created. setSubchannel(final InternalSubchannel subchannel)142 void setSubchannel(final InternalSubchannel subchannel) { 143 log.log(Level.FINE, "[{0}] Created with [{1}]", new Object[] {this, subchannel}); 144 this.subchannel = subchannel; 145 subchannelImpl = new AbstractSubchannel() { 146 @Override 147 public void shutdown() { 148 subchannel.shutdown(Status.UNAVAILABLE.withDescription("OobChannel is shutdown")); 149 } 150 151 @Override 152 ClientTransport obtainActiveTransport() { 153 return subchannel.obtainActiveTransport(); 154 } 155 156 @Override 157 InternalInstrumented<ChannelStats> getInternalSubchannel() { 158 return subchannel; 159 } 160 161 @Override 162 public void requestConnection() { 163 subchannel.obtainActiveTransport(); 164 } 165 166 @Override 167 public List<EquivalentAddressGroup> getAllAddresses() { 168 return subchannel.getAddressGroups(); 169 } 170 171 @Override 172 public Attributes getAttributes() { 173 return Attributes.EMPTY; 174 } 175 }; 176 177 subchannelPicker = new SubchannelPicker() { 178 final PickResult result = PickResult.withSubchannel(subchannelImpl); 179 180 @Override 181 public PickResult pickSubchannel(PickSubchannelArgs args) { 182 return result; 183 } 184 }; 185 delayedTransport.reprocess(subchannelPicker); 186 } 187 updateAddresses(EquivalentAddressGroup eag)188 void updateAddresses(EquivalentAddressGroup eag) { 189 subchannel.updateAddresses(Collections.singletonList(eag)); 190 } 191 192 @Override newCall( MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions)193 public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall( 194 MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) { 195 return new ClientCallImpl<RequestT, ResponseT>(methodDescriptor, 196 callOptions.getExecutor() == null ? executor : callOptions.getExecutor(), 197 callOptions, transportProvider, deadlineCancellationExecutor, channelCallsTracer, 198 false /* retryEnabled */); 199 } 200 201 @Override authority()202 public String authority() { 203 return authority; 204 } 205 206 @Override isTerminated()207 public boolean isTerminated() { 208 return terminatedLatch.getCount() == 0; 209 } 210 211 @Override awaitTermination(long time, TimeUnit unit)212 public boolean awaitTermination(long time, TimeUnit unit) throws InterruptedException { 213 return terminatedLatch.await(time, unit); 214 } 215 216 @Override getState(boolean requestConnectionIgnored)217 public ConnectivityState getState(boolean requestConnectionIgnored) { 218 if (subchannel == null) { 219 return ConnectivityState.IDLE; 220 } 221 return subchannel.getState(); 222 } 223 224 @Override shutdown()225 public ManagedChannel shutdown() { 226 shutdown = true; 227 delayedTransport.shutdown(Status.UNAVAILABLE.withDescription("OobChannel.shutdown() called")); 228 return this; 229 } 230 231 @Override isShutdown()232 public boolean isShutdown() { 233 return shutdown; 234 } 235 236 @Override shutdownNow()237 public ManagedChannel shutdownNow() { 238 shutdown = true; 239 delayedTransport.shutdownNow( 240 Status.UNAVAILABLE.withDescription("OobChannel.shutdownNow() called")); 241 return this; 242 } 243 handleSubchannelStateChange(final ConnectivityStateInfo newState)244 void handleSubchannelStateChange(final ConnectivityStateInfo newState) { 245 if (channelTracer != null) { 246 channelTracer.reportEvent( 247 new ChannelTrace.Event.Builder() 248 .setDescription("Entering " + newState.getState() + " state") 249 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 250 .setTimestampNanos(timeProvider.currentTimeNanos()) 251 .build()); 252 } 253 switch (newState.getState()) { 254 case READY: 255 case IDLE: 256 delayedTransport.reprocess(subchannelPicker); 257 break; 258 case TRANSIENT_FAILURE: 259 delayedTransport.reprocess(new SubchannelPicker() { 260 final PickResult errorResult = PickResult.withError(newState.getStatus()); 261 262 @Override 263 public PickResult pickSubchannel(PickSubchannelArgs args) { 264 return errorResult; 265 } 266 }); 267 break; 268 default: 269 // Do nothing 270 } 271 } 272 273 // must be run from channel executor handleSubchannelTerminated()274 void handleSubchannelTerminated() { 275 channelz.removeSubchannel(this); 276 // When delayedTransport is terminated, it shuts down subchannel. Therefore, at this point 277 // both delayedTransport and subchannel have terminated. 278 executorPool.returnObject(executor); 279 terminatedLatch.countDown(); 280 } 281 282 @VisibleForTesting getSubchannel()283 Subchannel getSubchannel() { 284 return subchannelImpl; 285 } 286 getInternalSubchannel()287 InternalSubchannel getInternalSubchannel() { 288 return subchannel; 289 } 290 291 @Override getStats()292 public ListenableFuture<ChannelStats> getStats() { 293 final SettableFuture<ChannelStats> ret = SettableFuture.create(); 294 final ChannelStats.Builder builder = new ChannelStats.Builder(); 295 channelCallsTracer.updateBuilder(builder); 296 if (channelTracer != null) { 297 channelTracer.updateBuilder(builder); 298 } 299 builder 300 .setTarget(authority) 301 .setState(subchannel.getState()) 302 .setSubchannels(Collections.<InternalWithLogId>singletonList(subchannel)); 303 ret.set(builder.build()); 304 return ret; 305 } 306 307 @Override getLogId()308 public InternalLogId getLogId() { 309 return logId; 310 } 311 312 @Override toString()313 public String toString() { 314 return MoreObjects.toStringHelper(this) 315 .add("logId", logId.getId()) 316 .add("authority", authority) 317 .toString(); 318 } 319 320 @Override resetConnectBackoff()321 public void resetConnectBackoff() { 322 subchannel.resetConnectBackoff(); 323 } 324 } 325