1 #region Copyright notice and license
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 #endregion
16 
17 using System;
18 using System.Collections.Generic;
19 using System.Threading;
20 using System.Threading.Tasks;
21 
22 using Grpc.Core.Internal;
23 using Grpc.Core.Logging;
24 using Grpc.Core.Utils;
25 
26 namespace Grpc.Core
27 {
28     /// <summary>
29     /// Represents a gRPC channel. Channels are an abstraction of long-lived connections to remote servers.
30     /// More client objects can reuse the same channel. Creating a channel is an expensive operation compared to invoking
31     /// a remote call so in general you should reuse a single channel for as many calls as possible.
32     /// </summary>
33     public class Channel
34     {
35         static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Channel>();
36 
37         readonly object myLock = new object();
38         readonly AtomicCounter activeCallCounter = new AtomicCounter();
39         readonly CancellationTokenSource shutdownTokenSource = new CancellationTokenSource();
40 
41         readonly string target;
42         readonly GrpcEnvironment environment;
43         readonly CompletionQueueSafeHandle completionQueue;
44         readonly ChannelSafeHandle handle;
45         readonly Dictionary<string, ChannelOption> options;
46 
47         bool shutdownRequested;
48 
49         /// <summary>
50         /// Creates a channel that connects to a specific host.
51         /// Port will default to 80 for an unsecure channel and to 443 for a secure channel.
52         /// </summary>
53         /// <param name="target">Target of the channel.</param>
54         /// <param name="credentials">Credentials to secure the channel.</param>
Channel(string target, ChannelCredentials credentials)55         public Channel(string target, ChannelCredentials credentials) :
56             this(target, credentials, null)
57         {
58         }
59 
60         /// <summary>
61         /// Creates a channel that connects to a specific host.
62         /// Port will default to 80 for an unsecure channel and to 443 for a secure channel.
63         /// </summary>
64         /// <param name="target">Target of the channel.</param>
65         /// <param name="credentials">Credentials to secure the channel.</param>
66         /// <param name="options">Channel options.</param>
Channel(string target, ChannelCredentials credentials, IEnumerable<ChannelOption> options)67         public Channel(string target, ChannelCredentials credentials, IEnumerable<ChannelOption> options)
68         {
69             this.target = GrpcPreconditions.CheckNotNull(target, "target");
70             this.options = CreateOptionsDictionary(options);
71             EnsureUserAgentChannelOption(this.options);
72             this.environment = GrpcEnvironment.AddRef();
73 
74             this.completionQueue = this.environment.PickCompletionQueue();
75             using (var nativeChannelArgs = ChannelOptions.CreateChannelArgs(this.options.Values))
76             {
77                 var nativeCredentials = credentials.GetNativeCredentials();
78                 if (nativeCredentials != null)
79                 {
80                     this.handle = ChannelSafeHandle.CreateSecure(nativeCredentials, target, nativeChannelArgs);
81                 }
82                 else
83                 {
84                     this.handle = ChannelSafeHandle.CreateInsecure(target, nativeChannelArgs);
85                 }
86             }
87             GrpcEnvironment.RegisterChannel(this);
88         }
89 
90         /// <summary>
91         /// Creates a channel that connects to a specific host and port.
92         /// </summary>
93         /// <param name="host">The name or IP address of the host.</param>
94         /// <param name="port">The port.</param>
95         /// <param name="credentials">Credentials to secure the channel.</param>
Channel(string host, int port, ChannelCredentials credentials)96         public Channel(string host, int port, ChannelCredentials credentials) :
97             this(host, port, credentials, null)
98         {
99         }
100 
101         /// <summary>
102         /// Creates a channel that connects to a specific host and port.
103         /// </summary>
104         /// <param name="host">The name or IP address of the host.</param>
105         /// <param name="port">The port.</param>
106         /// <param name="credentials">Credentials to secure the channel.</param>
107         /// <param name="options">Channel options.</param>
Channel(string host, int port, ChannelCredentials credentials, IEnumerable<ChannelOption> options)108         public Channel(string host, int port, ChannelCredentials credentials, IEnumerable<ChannelOption> options) :
109             this(string.Format("{0}:{1}", host, port), credentials, options)
110         {
111         }
112 
113         /// <summary>
114         /// Gets current connectivity state of this channel.
115         /// After channel is has been shutdown, <c>ChannelState.Shutdown</c> will be returned.
116         /// </summary>
117         public ChannelState State
118         {
119             get
120             {
121                 return GetConnectivityState(false);
122             }
123         }
124 
125         // cached handler for watch connectivity state
126         static readonly BatchCompletionDelegate WatchConnectivityStateHandler = (success, ctx, state) =>
127         {
128             var tcs = (TaskCompletionSource<bool>) state;
129             tcs.SetResult(success);
130         };
131 
132         /// <summary>
133         /// Returned tasks completes once channel state has become different from
134         /// given lastObservedState.
135         /// If deadline is reached or and error occurs, returned task is cancelled.
136         /// </summary>
WaitForStateChangedAsync(ChannelState lastObservedState, DateTime? deadline = null)137         public async Task WaitForStateChangedAsync(ChannelState lastObservedState, DateTime? deadline = null)
138         {
139             var result = await TryWaitForStateChangedAsync(lastObservedState, deadline).ConfigureAwait(false);
140             if (!result)
141             {
142                 throw new TaskCanceledException("Reached deadline.");
143             }
144         }
145 
146         /// <summary>
147         /// Returned tasks completes once channel state has become different from
148         /// given lastObservedState (<c>true</c> is returned) or if the wait has timed out (<c>false</c> is returned).
149         /// </summary>
TryWaitForStateChangedAsync(ChannelState lastObservedState, DateTime? deadline = null)150         public Task<bool> TryWaitForStateChangedAsync(ChannelState lastObservedState, DateTime? deadline = null)
151         {
152             GrpcPreconditions.CheckArgument(lastObservedState != ChannelState.Shutdown,
153                 "Shutdown is a terminal state. No further state changes can occur.");
154             var tcs = new TaskCompletionSource<bool>();
155             var deadlineTimespec = deadline.HasValue ? Timespec.FromDateTime(deadline.Value) : Timespec.InfFuture;
156             lock (myLock)
157             {
158                 if (handle.IsClosed)
159                 {
160                     // If channel has been already shutdown and handle was disposed, we would end up with
161                     // an abandoned completion added to the completion registry. Instead, we make sure we fail early.
162                     throw new ObjectDisposedException(nameof(handle), "Channel handle has already been disposed.");
163                 }
164                 else
165                 {
166                     // pass "tcs" as "state" for WatchConnectivityStateHandler.
167                     handle.WatchConnectivityState(lastObservedState, deadlineTimespec, completionQueue, WatchConnectivityStateHandler, tcs);
168                 }
169             }
170             return tcs.Task;
171         }
172 
173         /// <summary>Resolved address of the remote endpoint in URI format.</summary>
174         public string ResolvedTarget
175         {
176             get
177             {
178                 return handle.GetTarget();
179             }
180         }
181 
182         /// <summary>The original target used to create the channel.</summary>
183         public string Target
184         {
185             get
186             {
187                 return this.target;
188             }
189         }
190 
191         /// <summary>
192         /// Returns a token that gets cancelled once <c>ShutdownAsync</c> is invoked.
193         /// </summary>
194         public CancellationToken ShutdownToken
195         {
196             get
197             {
198                 return this.shutdownTokenSource.Token;
199             }
200         }
201 
202         /// <summary>
203         /// Allows explicitly requesting channel to connect without starting an RPC.
204         /// Returned task completes once state Ready was seen. If the deadline is reached,
205         /// or channel enters the Shutdown state, the task is cancelled.
206         /// There is no need to call this explicitly unless your use case requires that.
207         /// Starting an RPC on a new channel will request connection implicitly.
208         /// </summary>
209         /// <param name="deadline">The deadline. <c>null</c> indicates no deadline.</param>
210         public async Task ConnectAsync(DateTime? deadline = null)
211         {
212             var currentState = GetConnectivityState(true);
213             while (currentState != ChannelState.Ready)
214             {
215                 if (currentState == ChannelState.Shutdown)
216                 {
217                     throw new OperationCanceledException("Channel has reached Shutdown state.");
218                 }
219                 await WaitForStateChangedAsync(currentState, deadline).ConfigureAwait(false);
220                 currentState = GetConnectivityState(false);
221             }
222         }
223 
224         /// <summary>
225         /// Shuts down the channel cleanly. It is strongly recommended to shutdown
226         /// all previously created channels before exiting from the process.
227         /// </summary>
228         /// <remarks>
229         /// This method doesn't wait for all calls on this channel to finish (nor does
230         /// it explicitly cancel all outstanding calls). It is user's responsibility to make sure
231         /// all the calls on this channel have finished (successfully or with an error)
232         /// before shutting down the channel to ensure channel shutdown won't impact
233         /// the outcome of those remote calls.
234         /// </remarks>
ShutdownAsync()235         public async Task ShutdownAsync()
236         {
237             lock (myLock)
238             {
239                 GrpcPreconditions.CheckState(!shutdownRequested);
240                 shutdownRequested = true;
241             }
242             GrpcEnvironment.UnregisterChannel(this);
243 
244             shutdownTokenSource.Cancel();
245 
246             var activeCallCount = activeCallCounter.Count;
247             if (activeCallCount > 0)
248             {
249                 Logger.Warning("Channel shutdown was called but there are still {0} active calls for that channel.", activeCallCount);
250             }
251 
252             lock (myLock)
253             {
254                 handle.Dispose();
255             }
256 
257             await GrpcEnvironment.ReleaseAsync().ConfigureAwait(false);
258         }
259 
260         internal ChannelSafeHandle Handle
261         {
262             get
263             {
264                 return this.handle;
265             }
266         }
267 
268         internal GrpcEnvironment Environment
269         {
270             get
271             {
272                 return this.environment;
273             }
274         }
275 
276         internal CompletionQueueSafeHandle CompletionQueue
277         {
278             get
279             {
280                 return this.completionQueue;
281             }
282         }
283 
AddCallReference(object call)284         internal void AddCallReference(object call)
285         {
286             activeCallCounter.Increment();
287 
288             bool success = false;
289             handle.DangerousAddRef(ref success);
290             GrpcPreconditions.CheckState(success);
291         }
292 
RemoveCallReference(object call)293         internal void RemoveCallReference(object call)
294         {
295             handle.DangerousRelease();
296 
297             activeCallCounter.Decrement();
298         }
299 
300         // for testing only
GetCallReferenceCount()301         internal long GetCallReferenceCount()
302         {
303             return activeCallCounter.Count;
304         }
305 
GetConnectivityState(bool tryToConnect)306         private ChannelState GetConnectivityState(bool tryToConnect)
307         {
308             try
309             {
310                 lock (myLock)
311                 {
312                     return handle.CheckConnectivityState(tryToConnect);
313                 }
314             }
315             catch (ObjectDisposedException)
316             {
317                 return ChannelState.Shutdown;
318             }
319         }
320 
EnsureUserAgentChannelOption(Dictionary<string, ChannelOption> options)321         private static void EnsureUserAgentChannelOption(Dictionary<string, ChannelOption> options)
322         {
323             var key = ChannelOptions.PrimaryUserAgentString;
324             var userAgentString = "";
325 
326             ChannelOption option;
327             if (options.TryGetValue(key, out option))
328             {
329                 // user-provided userAgentString needs to be at the beginning
330                 userAgentString = option.StringValue + " ";
331             };
332 
333             // TODO(jtattermusch): it would be useful to also provide .NET/mono version.
334             userAgentString += string.Format("grpc-csharp/{0}", VersionInfo.CurrentVersion);
335 
336             options[ChannelOptions.PrimaryUserAgentString] = new ChannelOption(key, userAgentString);
337         }
338 
CreateOptionsDictionary(IEnumerable<ChannelOption> options)339         private static Dictionary<string, ChannelOption> CreateOptionsDictionary(IEnumerable<ChannelOption> options)
340         {
341             var dict = new Dictionary<string, ChannelOption>();
342             if (options == null)
343             {
344                 return dict;
345             }
346             foreach (var option in options)
347             {
348                 dict.Add(option.Name, option);
349             }
350             return dict;
351         }
352     }
353 }
354