1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 
21 import com.google.common.annotations.VisibleForTesting;
22 import com.google.common.base.Preconditions;
23 import com.google.common.util.concurrent.MoreExecutors;
24 import io.grpc.Attributes;
25 import io.grpc.BinaryLog;
26 import io.grpc.ClientInterceptor;
27 import io.grpc.CompressorRegistry;
28 import io.grpc.DecompressorRegistry;
29 import io.grpc.EquivalentAddressGroup;
30 import io.grpc.InternalChannelz;
31 import io.grpc.LoadBalancer;
32 import io.grpc.ManagedChannel;
33 import io.grpc.ManagedChannelBuilder;
34 import io.grpc.NameResolver;
35 import io.grpc.NameResolverProvider;
36 import io.opencensus.trace.Tracing;
37 import java.net.SocketAddress;
38 import java.net.URI;
39 import java.net.URISyntaxException;
40 import java.util.ArrayList;
41 import java.util.Arrays;
42 import java.util.Collections;
43 import java.util.List;
44 import java.util.concurrent.Executor;
45 import java.util.concurrent.TimeUnit;
46 import javax.annotation.Nullable;
47 
48 /**
49  * The base class for channel builders.
50  *
51  * @param <T> The concrete type of this builder.
52  */
53 public abstract class AbstractManagedChannelImplBuilder
54         <T extends AbstractManagedChannelImplBuilder<T>> extends ManagedChannelBuilder<T> {
55   private static final String DIRECT_ADDRESS_SCHEME = "directaddress";
56 
forAddress(String name, int port)57   public static ManagedChannelBuilder<?> forAddress(String name, int port) {
58     throw new UnsupportedOperationException("Subclass failed to hide static factory");
59   }
60 
forTarget(String target)61   public static ManagedChannelBuilder<?> forTarget(String target) {
62     throw new UnsupportedOperationException("Subclass failed to hide static factory");
63   }
64 
65   /**
66    * An idle timeout larger than this would disable idle mode.
67    */
68   @VisibleForTesting
69   static final long IDLE_MODE_MAX_TIMEOUT_DAYS = 30;
70 
71   /**
72    * The default idle timeout.
73    */
74   @VisibleForTesting
75   static final long IDLE_MODE_DEFAULT_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(30);
76 
77   /**
78    * An idle timeout smaller than this would be capped to it.
79    */
80   @VisibleForTesting
81   static final long IDLE_MODE_MIN_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(1);
82 
83   private static final ObjectPool<? extends Executor> DEFAULT_EXECUTOR_POOL =
84       SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
85 
86   private static final NameResolver.Factory DEFAULT_NAME_RESOLVER_FACTORY =
87       NameResolverProvider.asFactory();
88 
89   private static final DecompressorRegistry DEFAULT_DECOMPRESSOR_REGISTRY =
90       DecompressorRegistry.getDefaultInstance();
91 
92   private static final CompressorRegistry DEFAULT_COMPRESSOR_REGISTRY =
93       CompressorRegistry.getDefaultInstance();
94 
95   private static final long DEFAULT_RETRY_BUFFER_SIZE_IN_BYTES = 1L << 24;  // 16M
96   private static final long DEFAULT_PER_RPC_BUFFER_LIMIT_IN_BYTES = 1L << 20; // 1M
97 
98   ObjectPool<? extends Executor> executorPool = DEFAULT_EXECUTOR_POOL;
99 
100   private final List<ClientInterceptor> interceptors = new ArrayList<>();
101 
102   // Access via getter, which may perform authority override as needed
103   private NameResolver.Factory nameResolverFactory = DEFAULT_NAME_RESOLVER_FACTORY;
104 
105   final String target;
106 
107   @Nullable
108   private final SocketAddress directServerAddress;
109 
110   @Nullable
111   String userAgent;
112 
113   @VisibleForTesting
114   @Nullable
115   String authorityOverride;
116 
117 
118   @Nullable LoadBalancer.Factory loadBalancerFactory;
119 
120   boolean fullStreamDecompression;
121 
122   DecompressorRegistry decompressorRegistry = DEFAULT_DECOMPRESSOR_REGISTRY;
123 
124   CompressorRegistry compressorRegistry = DEFAULT_COMPRESSOR_REGISTRY;
125 
126   long idleTimeoutMillis = IDLE_MODE_DEFAULT_TIMEOUT_MILLIS;
127 
128   int maxRetryAttempts = 5;
129   int maxHedgedAttempts = 5;
130   long retryBufferSize = DEFAULT_RETRY_BUFFER_SIZE_IN_BYTES;
131   long perRpcBufferLimit = DEFAULT_PER_RPC_BUFFER_LIMIT_IN_BYTES;
132   boolean retryEnabled = false; // TODO(zdapeng): default to true
133   // Temporarily disable retry when stats or tracing is enabled to avoid breakage, until we know
134   // what should be the desired behavior for retry + stats/tracing.
135   // TODO(zdapeng): delete me
136   boolean temporarilyDisableRetry;
137 
138   InternalChannelz channelz = InternalChannelz.instance();
139   int maxTraceEvents;
140 
141   protected TransportTracer.Factory transportTracerFactory = TransportTracer.getDefaultFactory();
142 
143   private int maxInboundMessageSize = GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
144 
145   @Nullable
146   BinaryLog binlog;
147 
148   /**
149    * Sets the maximum message size allowed for a single gRPC frame. If an inbound messages
150    * larger than this limit is received it will not be processed and the RPC will fail with
151    * RESOURCE_EXHAUSTED.
152    */
153   // Can be overridden by subclasses.
154   @Override
maxInboundMessageSize(int max)155   public T maxInboundMessageSize(int max) {
156     checkArgument(max >= 0, "negative max");
157     maxInboundMessageSize = max;
158     return thisT();
159   }
160 
maxInboundMessageSize()161   protected final int maxInboundMessageSize() {
162     return maxInboundMessageSize;
163   }
164 
165   private boolean statsEnabled = true;
166   private boolean recordStartedRpcs = true;
167   private boolean recordFinishedRpcs = true;
168   private boolean tracingEnabled = true;
169 
170   @Nullable
171   private CensusStatsModule censusStatsOverride;
172 
AbstractManagedChannelImplBuilder(String target)173   protected AbstractManagedChannelImplBuilder(String target) {
174     this.target = Preconditions.checkNotNull(target, "target");
175     this.directServerAddress = null;
176   }
177 
178   /**
179    * Returns a target string for the SocketAddress. It is only used as a placeholder, because
180    * DirectAddressNameResolverFactory will not actually try to use it. However, it must be a valid
181    * URI.
182    */
183   @VisibleForTesting
makeTargetStringForDirectAddress(SocketAddress address)184   static String makeTargetStringForDirectAddress(SocketAddress address) {
185     try {
186       return new URI(DIRECT_ADDRESS_SCHEME, "", "/" + address, null).toString();
187     } catch (URISyntaxException e) {
188       // It should not happen.
189       throw new RuntimeException(e);
190     }
191   }
192 
AbstractManagedChannelImplBuilder(SocketAddress directServerAddress, String authority)193   protected AbstractManagedChannelImplBuilder(SocketAddress directServerAddress, String authority) {
194     this.target = makeTargetStringForDirectAddress(directServerAddress);
195     this.directServerAddress = directServerAddress;
196     this.nameResolverFactory = new DirectAddressNameResolverFactory(directServerAddress, authority);
197   }
198 
199   @Override
directExecutor()200   public final T directExecutor() {
201     return executor(MoreExecutors.directExecutor());
202   }
203 
204   @Override
executor(Executor executor)205   public final T executor(Executor executor) {
206     if (executor != null) {
207       this.executorPool = new FixedObjectPool<Executor>(executor);
208     } else {
209       this.executorPool = DEFAULT_EXECUTOR_POOL;
210     }
211     return thisT();
212   }
213 
214   @Override
intercept(List<ClientInterceptor> interceptors)215   public final T intercept(List<ClientInterceptor> interceptors) {
216     this.interceptors.addAll(interceptors);
217     return thisT();
218   }
219 
220   @Override
intercept(ClientInterceptor... interceptors)221   public final T intercept(ClientInterceptor... interceptors) {
222     return intercept(Arrays.asList(interceptors));
223   }
224 
225   @Override
nameResolverFactory(NameResolver.Factory resolverFactory)226   public final T nameResolverFactory(NameResolver.Factory resolverFactory) {
227     Preconditions.checkState(directServerAddress == null,
228         "directServerAddress is set (%s), which forbids the use of NameResolverFactory",
229         directServerAddress);
230     if (resolverFactory != null) {
231       this.nameResolverFactory = resolverFactory;
232     } else {
233       this.nameResolverFactory = DEFAULT_NAME_RESOLVER_FACTORY;
234     }
235     return thisT();
236   }
237 
238   @Override
loadBalancerFactory(LoadBalancer.Factory loadBalancerFactory)239   public final T loadBalancerFactory(LoadBalancer.Factory loadBalancerFactory) {
240     Preconditions.checkState(directServerAddress == null,
241         "directServerAddress is set (%s), which forbids the use of LoadBalancer.Factory",
242         directServerAddress);
243     this.loadBalancerFactory = loadBalancerFactory;
244     return thisT();
245   }
246 
247   @Override
enableFullStreamDecompression()248   public final T enableFullStreamDecompression() {
249     this.fullStreamDecompression = true;
250     return thisT();
251   }
252 
253   @Override
decompressorRegistry(DecompressorRegistry registry)254   public final T decompressorRegistry(DecompressorRegistry registry) {
255     if (registry != null) {
256       this.decompressorRegistry = registry;
257     } else {
258       this.decompressorRegistry = DEFAULT_DECOMPRESSOR_REGISTRY;
259     }
260     return thisT();
261   }
262 
263   @Override
compressorRegistry(CompressorRegistry registry)264   public final T compressorRegistry(CompressorRegistry registry) {
265     if (registry != null) {
266       this.compressorRegistry = registry;
267     } else {
268       this.compressorRegistry = DEFAULT_COMPRESSOR_REGISTRY;
269     }
270     return thisT();
271   }
272 
273   @Override
userAgent(@ullable String userAgent)274   public final T userAgent(@Nullable String userAgent) {
275     this.userAgent = userAgent;
276     return thisT();
277   }
278 
279   @Override
overrideAuthority(String authority)280   public final T overrideAuthority(String authority) {
281     this.authorityOverride = checkAuthority(authority);
282     return thisT();
283   }
284 
285   @Override
idleTimeout(long value, TimeUnit unit)286   public final T idleTimeout(long value, TimeUnit unit) {
287     checkArgument(value > 0, "idle timeout is %s, but must be positive", value);
288     // We convert to the largest unit to avoid overflow
289     if (unit.toDays(value) >= IDLE_MODE_MAX_TIMEOUT_DAYS) {
290       // This disables idle mode
291       this.idleTimeoutMillis = ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE;
292     } else {
293       this.idleTimeoutMillis = Math.max(unit.toMillis(value), IDLE_MODE_MIN_TIMEOUT_MILLIS);
294     }
295     return thisT();
296   }
297 
298   @Override
maxRetryAttempts(int maxRetryAttempts)299   public final T maxRetryAttempts(int maxRetryAttempts) {
300     this.maxRetryAttempts = maxRetryAttempts;
301     return thisT();
302   }
303 
304   @Override
maxHedgedAttempts(int maxHedgedAttempts)305   public final T maxHedgedAttempts(int maxHedgedAttempts) {
306     this.maxHedgedAttempts = maxHedgedAttempts;
307     return thisT();
308   }
309 
310   @Override
retryBufferSize(long bytes)311   public final T retryBufferSize(long bytes) {
312     checkArgument(bytes > 0L, "retry buffer size must be positive");
313     retryBufferSize = bytes;
314     return thisT();
315   }
316 
317   @Override
perRpcBufferLimit(long bytes)318   public final T perRpcBufferLimit(long bytes) {
319     checkArgument(bytes > 0L, "per RPC buffer limit must be positive");
320     perRpcBufferLimit = bytes;
321     return thisT();
322   }
323 
324   @Override
disableRetry()325   public final T disableRetry() {
326     retryEnabled = false;
327     return thisT();
328   }
329 
330   @Override
enableRetry()331   public final T enableRetry() {
332     retryEnabled = true;
333     return thisT();
334   }
335 
336   @Override
setBinaryLog(BinaryLog binlog)337   public final T setBinaryLog(BinaryLog binlog) {
338     this.binlog = binlog;
339     return thisT();
340   }
341 
342   @Override
maxTraceEvents(int maxTraceEvents)343   public T maxTraceEvents(int maxTraceEvents) {
344     checkArgument(maxTraceEvents >= 0, "maxTraceEvents must be non-negative");
345     this.maxTraceEvents = maxTraceEvents;
346     return thisT();
347   }
348 
349   /**
350    * Override the default stats implementation.
351    */
352   @VisibleForTesting
overrideCensusStatsModule(CensusStatsModule censusStats)353   protected final T overrideCensusStatsModule(CensusStatsModule censusStats) {
354     this.censusStatsOverride = censusStats;
355     return thisT();
356   }
357 
358   /**
359    * Disable or enable stats features.  Enabled by default.
360    */
setStatsEnabled(boolean value)361   protected void setStatsEnabled(boolean value) {
362     statsEnabled = value;
363   }
364 
365   /**
366    * Disable or enable stats recording for RPC upstarts.  Effective only if {@link
367    * #setStatsEnabled} is set to true.  Enabled by default.
368    */
setStatsRecordStartedRpcs(boolean value)369   protected void setStatsRecordStartedRpcs(boolean value) {
370     recordStartedRpcs = value;
371   }
372 
373   /**
374    * Disable or enable stats recording for RPC completions.  Effective only if {@link
375    * #setStatsEnabled} is set to true.  Enabled by default.
376    */
setStatsRecordFinishedRpcs(boolean value)377   protected void setStatsRecordFinishedRpcs(boolean value) {
378     recordFinishedRpcs = value;
379   }
380 
381   /**
382    * Disable or enable tracing features.  Enabled by default.
383    */
setTracingEnabled(boolean value)384   protected void setTracingEnabled(boolean value) {
385     tracingEnabled = value;
386   }
387 
388   @VisibleForTesting
getIdleTimeoutMillis()389   final long getIdleTimeoutMillis() {
390     return idleTimeoutMillis;
391   }
392 
393   /**
394    * Verifies the authority is valid.  This method exists as an escape hatch for putting in an
395    * authority that is valid, but would fail the default validation provided by this
396    * implementation.
397    */
checkAuthority(String authority)398   protected String checkAuthority(String authority) {
399     return GrpcUtil.checkAuthority(authority);
400   }
401 
402   @Override
build()403   public ManagedChannel build() {
404     return new ManagedChannelOrphanWrapper(new ManagedChannelImpl(
405         this,
406         buildTransportFactory(),
407         // TODO(carl-mastrangelo): Allow clients to pass this in
408         new ExponentialBackoffPolicy.Provider(),
409         SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR),
410         GrpcUtil.STOPWATCH_SUPPLIER,
411         getEffectiveInterceptors(),
412         TimeProvider.SYSTEM_TIME_PROVIDER));
413   }
414 
415   // Temporarily disable retry when stats or tracing is enabled to avoid breakage, until we know
416   // what should be the desired behavior for retry + stats/tracing.
417   // TODO(zdapeng): FIX IT
418   @VisibleForTesting
getEffectiveInterceptors()419   final List<ClientInterceptor> getEffectiveInterceptors() {
420     List<ClientInterceptor> effectiveInterceptors =
421         new ArrayList<>(this.interceptors);
422     temporarilyDisableRetry = false;
423     if (statsEnabled) {
424       temporarilyDisableRetry = true;
425       CensusStatsModule censusStats = this.censusStatsOverride;
426       if (censusStats == null) {
427         censusStats = new CensusStatsModule(GrpcUtil.STOPWATCH_SUPPLIER, true);
428       }
429       // First interceptor runs last (see ClientInterceptors.intercept()), so that no
430       // other interceptor can override the tracer factory we set in CallOptions.
431       effectiveInterceptors.add(
432           0, censusStats.getClientInterceptor(recordStartedRpcs, recordFinishedRpcs));
433     }
434     if (tracingEnabled) {
435       temporarilyDisableRetry = true;
436       CensusTracingModule censusTracing =
437           new CensusTracingModule(Tracing.getTracer(),
438               Tracing.getPropagationComponent().getBinaryFormat());
439       effectiveInterceptors.add(0, censusTracing.getClientInterceptor());
440     }
441     return effectiveInterceptors;
442   }
443 
444   /**
445    * Subclasses should override this method to provide the {@link ClientTransportFactory}
446    * appropriate for this channel. This method is meant for Transport implementors and should not
447    * be used by normal users.
448    */
buildTransportFactory()449   protected abstract ClientTransportFactory buildTransportFactory();
450 
451   /**
452    * Subclasses can override this method to provide additional parameters to {@link
453    * NameResolver.Factory#newNameResolver}. The default implementation returns {@link
454    * Attributes#EMPTY}.
455    */
getNameResolverParams()456   protected Attributes getNameResolverParams() {
457     return Attributes.EMPTY;
458   }
459 
460   /**
461    * Returns a {@link NameResolver.Factory} for the channel.
462    */
getNameResolverFactory()463   NameResolver.Factory getNameResolverFactory() {
464     if (authorityOverride == null) {
465       return nameResolverFactory;
466     } else {
467       return new OverrideAuthorityNameResolverFactory(nameResolverFactory, authorityOverride);
468     }
469   }
470 
471   private static class DirectAddressNameResolverFactory extends NameResolver.Factory {
472     final SocketAddress address;
473     final String authority;
474 
DirectAddressNameResolverFactory(SocketAddress address, String authority)475     DirectAddressNameResolverFactory(SocketAddress address, String authority) {
476       this.address = address;
477       this.authority = authority;
478     }
479 
480     @Override
newNameResolver(URI notUsedUri, Attributes params)481     public NameResolver newNameResolver(URI notUsedUri, Attributes params) {
482       return new NameResolver() {
483         @Override
484         public String getServiceAuthority() {
485           return authority;
486         }
487 
488         @Override
489         public void start(final Listener listener) {
490           listener.onAddresses(
491               Collections.singletonList(new EquivalentAddressGroup(address)),
492               Attributes.EMPTY);
493         }
494 
495         @Override
496         public void shutdown() {}
497       };
498     }
499 
500     @Override
getDefaultScheme()501     public String getDefaultScheme() {
502       return DIRECT_ADDRESS_SCHEME;
503     }
504   }
505 
506   /**
507    * Returns the correctly typed version of the builder.
508    */
509   private T thisT() {
510     @SuppressWarnings("unchecked")
511     T thisT = (T) this;
512     return thisT;
513   }
514 }
515