1 /*
2  * Copyright 2018 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.services;
18 
19 import com.google.common.base.Preconditions;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.protobuf.Any;
22 import com.google.protobuf.ByteString;
23 import com.google.protobuf.Int64Value;
24 import com.google.protobuf.util.Durations;
25 import com.google.protobuf.util.Timestamps;
26 import io.grpc.ConnectivityState;
27 import io.grpc.InternalChannelz;
28 import io.grpc.InternalChannelz.ChannelStats;
29 import io.grpc.InternalChannelz.ChannelTrace.Event;
30 import io.grpc.InternalChannelz.RootChannelList;
31 import io.grpc.InternalChannelz.ServerList;
32 import io.grpc.InternalChannelz.ServerSocketsList;
33 import io.grpc.InternalChannelz.ServerStats;
34 import io.grpc.InternalChannelz.SocketStats;
35 import io.grpc.InternalChannelz.TransportStats;
36 import io.grpc.InternalInstrumented;
37 import io.grpc.InternalWithLogId;
38 import io.grpc.Status;
39 import io.grpc.channelz.v1.Address;
40 import io.grpc.channelz.v1.Address.OtherAddress;
41 import io.grpc.channelz.v1.Address.TcpIpAddress;
42 import io.grpc.channelz.v1.Address.UdsAddress;
43 import io.grpc.channelz.v1.Channel;
44 import io.grpc.channelz.v1.ChannelConnectivityState;
45 import io.grpc.channelz.v1.ChannelConnectivityState.State;
46 import io.grpc.channelz.v1.ChannelData;
47 import io.grpc.channelz.v1.ChannelRef;
48 import io.grpc.channelz.v1.ChannelTrace;
49 import io.grpc.channelz.v1.ChannelTraceEvent;
50 import io.grpc.channelz.v1.ChannelTraceEvent.Severity;
51 import io.grpc.channelz.v1.GetServerSocketsResponse;
52 import io.grpc.channelz.v1.GetServersResponse;
53 import io.grpc.channelz.v1.GetTopChannelsResponse;
54 import io.grpc.channelz.v1.Security;
55 import io.grpc.channelz.v1.Security.OtherSecurity;
56 import io.grpc.channelz.v1.Security.Tls;
57 import io.grpc.channelz.v1.Server;
58 import io.grpc.channelz.v1.ServerData;
59 import io.grpc.channelz.v1.ServerRef;
60 import io.grpc.channelz.v1.Socket;
61 import io.grpc.channelz.v1.Socket.Builder;
62 import io.grpc.channelz.v1.SocketData;
63 import io.grpc.channelz.v1.SocketOption;
64 import io.grpc.channelz.v1.SocketOptionLinger;
65 import io.grpc.channelz.v1.SocketOptionTcpInfo;
66 import io.grpc.channelz.v1.SocketOptionTimeout;
67 import io.grpc.channelz.v1.SocketRef;
68 import io.grpc.channelz.v1.Subchannel;
69 import io.grpc.channelz.v1.SubchannelRef;
70 import java.net.InetSocketAddress;
71 import java.net.SocketAddress;
72 import java.security.cert.CertificateEncodingException;
73 import java.util.ArrayList;
74 import java.util.Collections;
75 import java.util.List;
76 import java.util.Map.Entry;
77 import java.util.concurrent.ExecutionException;
78 import java.util.logging.Level;
79 import java.util.logging.Logger;
80 
81 /**
82  * A static utility class for turning internal data structures into protos.
83  */
84 final class ChannelzProtoUtil {
85   private static final Logger logger = Logger.getLogger(ChannelzProtoUtil.class.getName());
86 
ChannelzProtoUtil()87   private ChannelzProtoUtil() {
88     // do not instantiate.
89   }
90 
toChannelRef(InternalWithLogId obj)91   static ChannelRef toChannelRef(InternalWithLogId obj) {
92     return ChannelRef
93         .newBuilder()
94         .setChannelId(obj.getLogId().getId())
95         .setName(obj.toString())
96         .build();
97   }
98 
toSubchannelRef(InternalWithLogId obj)99   static SubchannelRef toSubchannelRef(InternalWithLogId obj) {
100     return SubchannelRef
101         .newBuilder()
102         .setSubchannelId(obj.getLogId().getId())
103         .setName(obj.toString())
104         .build();
105   }
106 
toServerRef(InternalWithLogId obj)107   static ServerRef toServerRef(InternalWithLogId obj) {
108     return ServerRef
109         .newBuilder()
110         .setServerId(obj.getLogId().getId())
111         .setName(obj.toString())
112         .build();
113   }
114 
toSocketRef(InternalWithLogId obj)115   static SocketRef toSocketRef(InternalWithLogId obj) {
116     return SocketRef
117         .newBuilder()
118         .setSocketId(obj.getLogId().getId())
119         .setName(obj.toString())
120         .build();
121   }
122 
toServer(InternalInstrumented<ServerStats> obj)123   static Server toServer(InternalInstrumented<ServerStats> obj) {
124     ServerStats stats = getFuture(obj.getStats());
125     Server.Builder builder = Server
126         .newBuilder()
127         .setRef(toServerRef(obj))
128         .setData(toServerData(stats));
129     for (InternalInstrumented<SocketStats> listenSocket : stats.listenSockets) {
130       builder.addListenSocket(toSocketRef(listenSocket));
131     }
132     return builder.build();
133   }
134 
toServerData(ServerStats stats)135   static ServerData toServerData(ServerStats stats) {
136     return ServerData
137         .newBuilder()
138         .setCallsStarted(stats.callsStarted)
139         .setCallsSucceeded(stats.callsSucceeded)
140         .setCallsFailed(stats.callsFailed)
141         .setLastCallStartedTimestamp(Timestamps.fromNanos(stats.lastCallStartedNanos))
142         .build();
143   }
144 
toSecurity(InternalChannelz.Security security)145   static Security toSecurity(InternalChannelz.Security security) {
146     Preconditions.checkNotNull(security);
147     Preconditions.checkState(
148         security.tls != null ^ security.other != null,
149         "one of tls or othersecurity must be non null");
150     if (security.tls != null) {
151       Tls.Builder tlsBuilder
152           = Tls.newBuilder().setStandardName(security.tls.cipherSuiteStandardName);
153       try {
154         if (security.tls.localCert != null) {
155           tlsBuilder.setLocalCertificate(ByteString.copyFrom(
156               security.tls.localCert.getEncoded()));
157         }
158         if (security.tls.remoteCert != null) {
159           tlsBuilder.setRemoteCertificate(ByteString.copyFrom(
160               security.tls.remoteCert.getEncoded()));
161         }
162       } catch (CertificateEncodingException e) {
163         logger.log(Level.FINE, "Caught exception", e);
164       }
165       return Security.newBuilder().setTls(tlsBuilder).build();
166     } else {
167       OtherSecurity.Builder builder = OtherSecurity.newBuilder().setName(security.other.name);
168       if (security.other.any != null) {
169         builder.setValue((Any) security.other.any);
170       }
171       return Security.newBuilder().setOther(builder).build();
172     }
173   }
174 
toSocket(InternalInstrumented<SocketStats> obj)175   static Socket toSocket(InternalInstrumented<SocketStats> obj) {
176     SocketStats socketStats = getFuture(obj.getStats());
177     Builder builder = Socket.newBuilder()
178         .setRef(toSocketRef(obj))
179         .setLocal(toAddress(socketStats.local));
180     if (socketStats.security != null) {
181       builder.setSecurity(toSecurity(socketStats.security));
182     }
183     // listen sockets do not have remote nor data
184     if (socketStats.remote != null) {
185       builder.setRemote(toAddress(socketStats.remote));
186     }
187     builder.setData(extractSocketData(socketStats));
188     return builder.build();
189   }
190 
toAddress(SocketAddress address)191   static Address toAddress(SocketAddress address) {
192     Preconditions.checkNotNull(address);
193     Address.Builder builder = Address.newBuilder();
194     if (address instanceof InetSocketAddress) {
195       InetSocketAddress inetAddress = (InetSocketAddress) address;
196       builder.setTcpipAddress(
197           TcpIpAddress
198               .newBuilder()
199               .setIpAddress(
200                   ByteString.copyFrom(inetAddress.getAddress().getAddress()))
201               .setPort(inetAddress.getPort())
202               .build());
203     } else if (address.getClass().getName().endsWith("io.netty.channel.unix.DomainSocketAddress")) {
204       builder.setUdsAddress(
205           UdsAddress
206               .newBuilder()
207               .setFilename(address.toString()) // DomainSocketAddress.toString returns filename
208               .build());
209     } else {
210       builder.setOtherAddress(OtherAddress.newBuilder().setName(address.toString()).build());
211     }
212     return builder.build();
213   }
214 
extractSocketData(SocketStats socketStats)215   static SocketData extractSocketData(SocketStats socketStats) {
216     SocketData.Builder builder = SocketData.newBuilder();
217     if (socketStats.data != null) {
218       TransportStats s = socketStats.data;
219       builder
220           .setStreamsStarted(s.streamsStarted)
221           .setStreamsSucceeded(s.streamsSucceeded)
222           .setStreamsFailed(s.streamsFailed)
223           .setMessagesSent(s.messagesSent)
224           .setMessagesReceived(s.messagesReceived)
225           .setKeepAlivesSent(s.keepAlivesSent)
226           .setLastLocalStreamCreatedTimestamp(
227               Timestamps.fromNanos(s.lastLocalStreamCreatedTimeNanos))
228           .setLastRemoteStreamCreatedTimestamp(
229               Timestamps.fromNanos(s.lastRemoteStreamCreatedTimeNanos))
230           .setLastMessageSentTimestamp(
231               Timestamps.fromNanos(s.lastMessageSentTimeNanos))
232           .setLastMessageReceivedTimestamp(
233               Timestamps.fromNanos(s.lastMessageReceivedTimeNanos))
234           .setLocalFlowControlWindow(
235               Int64Value.of(s.localFlowControlWindow))
236           .setRemoteFlowControlWindow(
237               Int64Value.of(s.remoteFlowControlWindow));
238     }
239     builder.addAllOption(toSocketOptionsList(socketStats.socketOptions));
240     return builder.build();
241   }
242 
243   public static final String SO_LINGER = "SO_LINGER";
244   public static final String SO_TIMEOUT = "SO_TIMEOUT";
245   public static final String TCP_INFO = "TCP_INFO";
246 
toSocketOptionLinger(int lingerSeconds)247   static SocketOption toSocketOptionLinger(int lingerSeconds) {
248     final SocketOptionLinger lingerOpt;
249     if (lingerSeconds >= 0) {
250       lingerOpt = SocketOptionLinger
251           .newBuilder()
252           .setActive(true)
253           .setDuration(Durations.fromSeconds(lingerSeconds))
254           .build();
255     } else {
256       lingerOpt = SocketOptionLinger.getDefaultInstance();
257     }
258     return SocketOption
259         .newBuilder()
260         .setName(SO_LINGER)
261         .setAdditional(Any.pack(lingerOpt))
262         .build();
263   }
264 
toSocketOptionTimeout(String name, int timeoutMillis)265   static SocketOption toSocketOptionTimeout(String name, int timeoutMillis) {
266     Preconditions.checkNotNull(name);
267     return SocketOption
268         .newBuilder()
269         .setName(name)
270         .setAdditional(
271             Any.pack(
272                 SocketOptionTimeout
273                     .newBuilder()
274                     .setDuration(Durations.fromMillis(timeoutMillis))
275                     .build()))
276         .build();
277   }
278 
toSocketOptionTcpInfo(InternalChannelz.TcpInfo i)279   static SocketOption toSocketOptionTcpInfo(InternalChannelz.TcpInfo i) {
280     SocketOptionTcpInfo tcpInfo = SocketOptionTcpInfo.newBuilder()
281         .setTcpiState(i.state)
282         .setTcpiCaState(i.caState)
283         .setTcpiRetransmits(i.retransmits)
284         .setTcpiProbes(i.probes)
285         .setTcpiBackoff(i.backoff)
286         .setTcpiOptions(i.options)
287         .setTcpiSndWscale(i.sndWscale)
288         .setTcpiRcvWscale(i.rcvWscale)
289         .setTcpiRto(i.rto)
290         .setTcpiAto(i.ato)
291         .setTcpiSndMss(i.sndMss)
292         .setTcpiRcvMss(i.rcvMss)
293         .setTcpiUnacked(i.unacked)
294         .setTcpiSacked(i.sacked)
295         .setTcpiLost(i.lost)
296         .setTcpiRetrans(i.retrans)
297         .setTcpiFackets(i.fackets)
298         .setTcpiLastDataSent(i.lastDataSent)
299         .setTcpiLastAckSent(i.lastAckSent)
300         .setTcpiLastDataRecv(i.lastDataRecv)
301         .setTcpiLastAckRecv(i.lastAckRecv)
302         .setTcpiPmtu(i.pmtu)
303         .setTcpiRcvSsthresh(i.rcvSsthresh)
304         .setTcpiRtt(i.rtt)
305         .setTcpiRttvar(i.rttvar)
306         .setTcpiSndSsthresh(i.sndSsthresh)
307         .setTcpiSndCwnd(i.sndCwnd)
308         .setTcpiAdvmss(i.advmss)
309         .setTcpiReordering(i.reordering)
310         .build();
311     return SocketOption
312         .newBuilder()
313         .setName(TCP_INFO)
314         .setAdditional(Any.pack(tcpInfo))
315         .build();
316   }
317 
toSocketOptionAdditional(String name, String value)318   static SocketOption toSocketOptionAdditional(String name, String value) {
319     Preconditions.checkNotNull(name);
320     Preconditions.checkNotNull(value);
321     return SocketOption.newBuilder().setName(name).setValue(value).build();
322   }
323 
toSocketOptionsList(InternalChannelz.SocketOptions options)324   static List<SocketOption> toSocketOptionsList(InternalChannelz.SocketOptions options) {
325     Preconditions.checkNotNull(options);
326     List<SocketOption> ret = new ArrayList<>();
327     if (options.lingerSeconds != null) {
328       ret.add(toSocketOptionLinger(options.lingerSeconds));
329     }
330     if (options.soTimeoutMillis != null) {
331       ret.add(toSocketOptionTimeout(SO_TIMEOUT, options.soTimeoutMillis));
332     }
333     if (options.tcpInfo != null) {
334       ret.add(toSocketOptionTcpInfo(options.tcpInfo));
335     }
336     for (Entry<String, String> entry : options.others.entrySet()) {
337       ret.add(toSocketOptionAdditional(entry.getKey(), entry.getValue()));
338     }
339     return ret;
340   }
341 
toChannel(InternalInstrumented<ChannelStats> channel)342   static Channel toChannel(InternalInstrumented<ChannelStats> channel) {
343     ChannelStats stats = getFuture(channel.getStats());
344     Channel.Builder channelBuilder = Channel
345         .newBuilder()
346         .setRef(toChannelRef(channel))
347         .setData(extractChannelData(stats));
348     for (InternalWithLogId subchannel : stats.subchannels) {
349       channelBuilder.addSubchannelRef(toSubchannelRef(subchannel));
350     }
351 
352     return channelBuilder.build();
353   }
354 
extractChannelData(InternalChannelz.ChannelStats stats)355   static ChannelData extractChannelData(InternalChannelz.ChannelStats stats) {
356     ChannelData.Builder builder = ChannelData.newBuilder();
357     builder.setTarget(stats.target)
358         .setState(toChannelConnectivityState(stats.state))
359         .setCallsStarted(stats.callsStarted)
360         .setCallsSucceeded(stats.callsSucceeded)
361         .setCallsFailed(stats.callsFailed)
362         .setLastCallStartedTimestamp(Timestamps.fromNanos(stats.lastCallStartedNanos));
363     if (stats.channelTrace != null) {
364       builder.setTrace(toChannelTrace(stats.channelTrace));
365     }
366     return builder.build();
367   }
368 
toChannelConnectivityState(ConnectivityState s)369   static ChannelConnectivityState toChannelConnectivityState(ConnectivityState s) {
370     return ChannelConnectivityState.newBuilder().setState(toState(s)).build();
371   }
372 
toChannelTrace(InternalChannelz.ChannelTrace channelTrace)373   private static ChannelTrace toChannelTrace(InternalChannelz.ChannelTrace channelTrace) {
374     return ChannelTrace.newBuilder()
375         .setNumEventsLogged(channelTrace.numEventsLogged)
376         .setCreationTimestamp(Timestamps.fromNanos(channelTrace.creationTimeNanos))
377         .addAllEvents(toChannelTraceEvents(channelTrace.events))
378         .build();
379   }
380 
toChannelTraceEvents(List<Event> events)381   private static List<ChannelTraceEvent> toChannelTraceEvents(List<Event> events) {
382     List<ChannelTraceEvent> channelTraceEvents = new ArrayList<>();
383     for (Event event : events) {
384       ChannelTraceEvent.Builder builder = ChannelTraceEvent.newBuilder()
385           .setDescription(event.description)
386           .setSeverity(Severity.valueOf(event.severity.name()))
387           .setTimestamp(Timestamps.fromNanos(event.timestampNanos));
388       if (event.channelRef != null) {
389         builder.setChannelRef(toChannelRef(event.channelRef));
390       }
391       if (event.subchannelRef != null) {
392         builder.setSubchannelRef(toSubchannelRef(event.subchannelRef));
393       }
394       channelTraceEvents.add(builder.build());
395     }
396     return Collections.unmodifiableList(channelTraceEvents);
397   }
398 
toState(ConnectivityState state)399   static State toState(ConnectivityState state) {
400     if (state == null) {
401       return State.UNKNOWN;
402     }
403     try {
404       return Enum.valueOf(State.class, state.name());
405     } catch (IllegalArgumentException e) {
406       return State.UNKNOWN;
407     }
408   }
409 
toSubchannel(InternalInstrumented<ChannelStats> subchannel)410   static Subchannel toSubchannel(InternalInstrumented<ChannelStats> subchannel) {
411     ChannelStats stats = getFuture(subchannel.getStats());
412     Subchannel.Builder subchannelBuilder = Subchannel
413         .newBuilder()
414         .setRef(toSubchannelRef(subchannel))
415         .setData(extractChannelData(stats));
416     Preconditions.checkState(stats.sockets.isEmpty() || stats.subchannels.isEmpty());
417     for (InternalWithLogId childSocket : stats.sockets) {
418       subchannelBuilder.addSocketRef(toSocketRef(childSocket));
419     }
420     for (InternalWithLogId childSubchannel : stats.subchannels) {
421       subchannelBuilder.addSubchannelRef(toSubchannelRef(childSubchannel));
422     }
423     return subchannelBuilder.build();
424   }
425 
toGetTopChannelResponse(RootChannelList rootChannels)426   static GetTopChannelsResponse toGetTopChannelResponse(RootChannelList rootChannels) {
427     GetTopChannelsResponse.Builder responseBuilder = GetTopChannelsResponse
428         .newBuilder()
429         .setEnd(rootChannels.end);
430     for (InternalInstrumented<ChannelStats> c : rootChannels.channels) {
431       responseBuilder.addChannel(ChannelzProtoUtil.toChannel(c));
432     }
433     return responseBuilder.build();
434   }
435 
toGetServersResponse(ServerList servers)436   static GetServersResponse toGetServersResponse(ServerList servers) {
437     GetServersResponse.Builder responseBuilder = GetServersResponse
438         .newBuilder()
439         .setEnd(servers.end);
440     for (InternalInstrumented<ServerStats> s : servers.servers) {
441       responseBuilder.addServer(ChannelzProtoUtil.toServer(s));
442     }
443     return responseBuilder.build();
444   }
445 
toGetServerSocketsResponse(ServerSocketsList serverSockets)446   static GetServerSocketsResponse toGetServerSocketsResponse(ServerSocketsList serverSockets) {
447     GetServerSocketsResponse.Builder responseBuilder = GetServerSocketsResponse
448         .newBuilder()
449         .setEnd(serverSockets.end);
450     for (InternalWithLogId s : serverSockets.sockets) {
451       responseBuilder.addSocketRef(ChannelzProtoUtil.toSocketRef(s));
452     }
453     return responseBuilder.build();
454   }
455 
getFuture(ListenableFuture<T> future)456   private static <T> T getFuture(ListenableFuture<T> future) {
457     try {
458       T ret = future.get();
459       if (ret == null) {
460         throw Status.UNIMPLEMENTED
461             .withDescription("The entity's stats can not be retrieved. "
462                 + "If this is an InProcessTransport this is expected.")
463             .asRuntimeException();
464       }
465       return ret;
466     } catch (InterruptedException e) {
467       throw Status.INTERNAL.withCause(e).asRuntimeException();
468     } catch (ExecutionException e) {
469       throw Status.INTERNAL.withCause(e).asRuntimeException();
470     }
471   }
472 }
473