1 #region Copyright notice and license 2 // Copyright 2015 gRPC authors. 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 #endregion 16 17 using System; 18 using System.Collections.Generic; 19 using System.Threading; 20 using System.Threading.Tasks; 21 22 using Grpc.Core.Internal; 23 using Grpc.Core.Logging; 24 using Grpc.Core.Utils; 25 26 namespace Grpc.Core 27 { 28 /// <summary> 29 /// Represents a gRPC channel. Channels are an abstraction of long-lived connections to remote servers. 30 /// More client objects can reuse the same channel. Creating a channel is an expensive operation compared to invoking 31 /// a remote call so in general you should reuse a single channel for as many calls as possible. 32 /// </summary> 33 public class Channel 34 { 35 static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Channel>(); 36 37 readonly object myLock = new object(); 38 readonly AtomicCounter activeCallCounter = new AtomicCounter(); 39 readonly CancellationTokenSource shutdownTokenSource = new CancellationTokenSource(); 40 41 readonly string target; 42 readonly GrpcEnvironment environment; 43 readonly CompletionQueueSafeHandle completionQueue; 44 readonly ChannelSafeHandle handle; 45 readonly Dictionary<string, ChannelOption> options; 46 47 bool shutdownRequested; 48 49 /// <summary> 50 /// Creates a channel that connects to a specific host. 51 /// Port will default to 80 for an unsecure channel and to 443 for a secure channel. 52 /// </summary> 53 /// <param name="target">Target of the channel.</param> 54 /// <param name="credentials">Credentials to secure the channel.</param> Channel(string target, ChannelCredentials credentials)55 public Channel(string target, ChannelCredentials credentials) : 56 this(target, credentials, null) 57 { 58 } 59 60 /// <summary> 61 /// Creates a channel that connects to a specific host. 62 /// Port will default to 80 for an unsecure channel and to 443 for a secure channel. 63 /// </summary> 64 /// <param name="target">Target of the channel.</param> 65 /// <param name="credentials">Credentials to secure the channel.</param> 66 /// <param name="options">Channel options.</param> Channel(string target, ChannelCredentials credentials, IEnumerable<ChannelOption> options)67 public Channel(string target, ChannelCredentials credentials, IEnumerable<ChannelOption> options) 68 { 69 this.target = GrpcPreconditions.CheckNotNull(target, "target"); 70 this.options = CreateOptionsDictionary(options); 71 EnsureUserAgentChannelOption(this.options); 72 this.environment = GrpcEnvironment.AddRef(); 73 74 this.completionQueue = this.environment.PickCompletionQueue(); 75 using (var nativeChannelArgs = ChannelOptions.CreateChannelArgs(this.options.Values)) 76 { 77 var nativeCredentials = credentials.GetNativeCredentials(); 78 if (nativeCredentials != null) 79 { 80 this.handle = ChannelSafeHandle.CreateSecure(nativeCredentials, target, nativeChannelArgs); 81 } 82 else 83 { 84 this.handle = ChannelSafeHandle.CreateInsecure(target, nativeChannelArgs); 85 } 86 } 87 GrpcEnvironment.RegisterChannel(this); 88 } 89 90 /// <summary> 91 /// Creates a channel that connects to a specific host and port. 92 /// </summary> 93 /// <param name="host">The name or IP address of the host.</param> 94 /// <param name="port">The port.</param> 95 /// <param name="credentials">Credentials to secure the channel.</param> Channel(string host, int port, ChannelCredentials credentials)96 public Channel(string host, int port, ChannelCredentials credentials) : 97 this(host, port, credentials, null) 98 { 99 } 100 101 /// <summary> 102 /// Creates a channel that connects to a specific host and port. 103 /// </summary> 104 /// <param name="host">The name or IP address of the host.</param> 105 /// <param name="port">The port.</param> 106 /// <param name="credentials">Credentials to secure the channel.</param> 107 /// <param name="options">Channel options.</param> Channel(string host, int port, ChannelCredentials credentials, IEnumerable<ChannelOption> options)108 public Channel(string host, int port, ChannelCredentials credentials, IEnumerable<ChannelOption> options) : 109 this(string.Format("{0}:{1}", host, port), credentials, options) 110 { 111 } 112 113 /// <summary> 114 /// Gets current connectivity state of this channel. 115 /// After channel is has been shutdown, <c>ChannelState.Shutdown</c> will be returned. 116 /// </summary> 117 public ChannelState State 118 { 119 get 120 { 121 return GetConnectivityState(false); 122 } 123 } 124 125 // cached handler for watch connectivity state 126 static readonly BatchCompletionDelegate WatchConnectivityStateHandler = (success, ctx, state) => 127 { 128 var tcs = (TaskCompletionSource<bool>) state; 129 tcs.SetResult(success); 130 }; 131 132 /// <summary> 133 /// Returned tasks completes once channel state has become different from 134 /// given lastObservedState. 135 /// If deadline is reached or and error occurs, returned task is cancelled. 136 /// </summary> WaitForStateChangedAsync(ChannelState lastObservedState, DateTime? deadline = null)137 public async Task WaitForStateChangedAsync(ChannelState lastObservedState, DateTime? deadline = null) 138 { 139 var result = await TryWaitForStateChangedAsync(lastObservedState, deadline).ConfigureAwait(false); 140 if (!result) 141 { 142 throw new TaskCanceledException("Reached deadline."); 143 } 144 } 145 146 /// <summary> 147 /// Returned tasks completes once channel state has become different from 148 /// given lastObservedState (<c>true</c> is returned) or if the wait has timed out (<c>false</c> is returned). 149 /// </summary> TryWaitForStateChangedAsync(ChannelState lastObservedState, DateTime? deadline = null)150 public Task<bool> TryWaitForStateChangedAsync(ChannelState lastObservedState, DateTime? deadline = null) 151 { 152 GrpcPreconditions.CheckArgument(lastObservedState != ChannelState.Shutdown, 153 "Shutdown is a terminal state. No further state changes can occur."); 154 var tcs = new TaskCompletionSource<bool>(); 155 var deadlineTimespec = deadline.HasValue ? Timespec.FromDateTime(deadline.Value) : Timespec.InfFuture; 156 lock (myLock) 157 { 158 if (handle.IsClosed) 159 { 160 // If channel has been already shutdown and handle was disposed, we would end up with 161 // an abandoned completion added to the completion registry. Instead, we make sure we fail early. 162 throw new ObjectDisposedException(nameof(handle), "Channel handle has already been disposed."); 163 } 164 else 165 { 166 // pass "tcs" as "state" for WatchConnectivityStateHandler. 167 handle.WatchConnectivityState(lastObservedState, deadlineTimespec, completionQueue, WatchConnectivityStateHandler, tcs); 168 } 169 } 170 return tcs.Task; 171 } 172 173 /// <summary>Resolved address of the remote endpoint in URI format.</summary> 174 public string ResolvedTarget 175 { 176 get 177 { 178 return handle.GetTarget(); 179 } 180 } 181 182 /// <summary>The original target used to create the channel.</summary> 183 public string Target 184 { 185 get 186 { 187 return this.target; 188 } 189 } 190 191 /// <summary> 192 /// Returns a token that gets cancelled once <c>ShutdownAsync</c> is invoked. 193 /// </summary> 194 public CancellationToken ShutdownToken 195 { 196 get 197 { 198 return this.shutdownTokenSource.Token; 199 } 200 } 201 202 /// <summary> 203 /// Allows explicitly requesting channel to connect without starting an RPC. 204 /// Returned task completes once state Ready was seen. If the deadline is reached, 205 /// or channel enters the Shutdown state, the task is cancelled. 206 /// There is no need to call this explicitly unless your use case requires that. 207 /// Starting an RPC on a new channel will request connection implicitly. 208 /// </summary> 209 /// <param name="deadline">The deadline. <c>null</c> indicates no deadline.</param> 210 public async Task ConnectAsync(DateTime? deadline = null) 211 { 212 var currentState = GetConnectivityState(true); 213 while (currentState != ChannelState.Ready) 214 { 215 if (currentState == ChannelState.Shutdown) 216 { 217 throw new OperationCanceledException("Channel has reached Shutdown state."); 218 } 219 await WaitForStateChangedAsync(currentState, deadline).ConfigureAwait(false); 220 currentState = GetConnectivityState(false); 221 } 222 } 223 224 /// <summary> 225 /// Shuts down the channel cleanly. It is strongly recommended to shutdown 226 /// all previously created channels before exiting from the process. 227 /// </summary> 228 /// <remarks> 229 /// This method doesn't wait for all calls on this channel to finish (nor does 230 /// it explicitly cancel all outstanding calls). It is user's responsibility to make sure 231 /// all the calls on this channel have finished (successfully or with an error) 232 /// before shutting down the channel to ensure channel shutdown won't impact 233 /// the outcome of those remote calls. 234 /// </remarks> ShutdownAsync()235 public async Task ShutdownAsync() 236 { 237 lock (myLock) 238 { 239 GrpcPreconditions.CheckState(!shutdownRequested); 240 shutdownRequested = true; 241 } 242 GrpcEnvironment.UnregisterChannel(this); 243 244 shutdownTokenSource.Cancel(); 245 246 var activeCallCount = activeCallCounter.Count; 247 if (activeCallCount > 0) 248 { 249 Logger.Warning("Channel shutdown was called but there are still {0} active calls for that channel.", activeCallCount); 250 } 251 252 lock (myLock) 253 { 254 handle.Dispose(); 255 } 256 257 await GrpcEnvironment.ReleaseAsync().ConfigureAwait(false); 258 } 259 260 internal ChannelSafeHandle Handle 261 { 262 get 263 { 264 return this.handle; 265 } 266 } 267 268 internal GrpcEnvironment Environment 269 { 270 get 271 { 272 return this.environment; 273 } 274 } 275 276 internal CompletionQueueSafeHandle CompletionQueue 277 { 278 get 279 { 280 return this.completionQueue; 281 } 282 } 283 AddCallReference(object call)284 internal void AddCallReference(object call) 285 { 286 activeCallCounter.Increment(); 287 288 bool success = false; 289 handle.DangerousAddRef(ref success); 290 GrpcPreconditions.CheckState(success); 291 } 292 RemoveCallReference(object call)293 internal void RemoveCallReference(object call) 294 { 295 handle.DangerousRelease(); 296 297 activeCallCounter.Decrement(); 298 } 299 300 // for testing only GetCallReferenceCount()301 internal long GetCallReferenceCount() 302 { 303 return activeCallCounter.Count; 304 } 305 GetConnectivityState(bool tryToConnect)306 private ChannelState GetConnectivityState(bool tryToConnect) 307 { 308 try 309 { 310 lock (myLock) 311 { 312 return handle.CheckConnectivityState(tryToConnect); 313 } 314 } 315 catch (ObjectDisposedException) 316 { 317 return ChannelState.Shutdown; 318 } 319 } 320 EnsureUserAgentChannelOption(Dictionary<string, ChannelOption> options)321 private static void EnsureUserAgentChannelOption(Dictionary<string, ChannelOption> options) 322 { 323 var key = ChannelOptions.PrimaryUserAgentString; 324 var userAgentString = ""; 325 326 ChannelOption option; 327 if (options.TryGetValue(key, out option)) 328 { 329 // user-provided userAgentString needs to be at the beginning 330 userAgentString = option.StringValue + " "; 331 }; 332 333 // TODO(jtattermusch): it would be useful to also provide .NET/mono version. 334 userAgentString += string.Format("grpc-csharp/{0}", VersionInfo.CurrentVersion); 335 336 options[ChannelOptions.PrimaryUserAgentString] = new ChannelOption(key, userAgentString); 337 } 338 CreateOptionsDictionary(IEnumerable<ChannelOption> options)339 private static Dictionary<string, ChannelOption> CreateOptionsDictionary(IEnumerable<ChannelOption> options) 340 { 341 var dict = new Dictionary<string, ChannelOption>(); 342 if (options == null) 343 { 344 return dict; 345 } 346 foreach (var option in options) 347 { 348 dict.Add(option.Name, option); 349 } 350 return dict; 351 } 352 } 353 } 354