1 /*
2  * Copyright 2017 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.grpclb;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 
21 import com.google.protobuf.util.Timestamps;
22 import io.grpc.CallOptions;
23 import io.grpc.ClientStreamTracer;
24 import io.grpc.Metadata;
25 import io.grpc.Status;
26 import io.grpc.internal.TimeProvider;
27 import io.grpc.lb.v1.ClientStats;
28 import io.grpc.lb.v1.ClientStatsPerToken;
29 import java.util.Collections;
30 import java.util.HashMap;
31 import java.util.Map;
32 import java.util.Map.Entry;
33 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
34 import javax.annotation.concurrent.GuardedBy;
35 import javax.annotation.concurrent.ThreadSafe;
36 
37 /**
38  * Record and aggregate client-side load data for GRPCLB.  This records load occurred during the
39  * span of an LB stream with the remote load-balancer.
40  */
41 @ThreadSafe
42 final class GrpclbClientLoadRecorder extends ClientStreamTracer.Factory {
43 
44   private static final AtomicLongFieldUpdater<GrpclbClientLoadRecorder> callsStartedUpdater =
45       AtomicLongFieldUpdater.newUpdater(GrpclbClientLoadRecorder.class, "callsStarted");
46   private static final AtomicLongFieldUpdater<GrpclbClientLoadRecorder> callsFinishedUpdater =
47       AtomicLongFieldUpdater.newUpdater(GrpclbClientLoadRecorder.class, "callsFinished");
48   private static final AtomicLongFieldUpdater<GrpclbClientLoadRecorder> callsFailedToSendUpdater =
49       AtomicLongFieldUpdater.newUpdater(GrpclbClientLoadRecorder.class, "callsFailedToSend");
50   private static final AtomicLongFieldUpdater<GrpclbClientLoadRecorder>
51       callsFinishedKnownReceivedUpdater =
52           AtomicLongFieldUpdater.newUpdater(
53               GrpclbClientLoadRecorder.class, "callsFinishedKnownReceived");
54 
55   private final TimeProvider time;
56   @SuppressWarnings("unused")
57   private volatile long callsStarted;
58   @SuppressWarnings("unused")
59   private volatile long callsFinished;
60 
61   private static final class LongHolder {
62     long num;
63   }
64 
65   // Specific finish types
66   @GuardedBy("this")
67   private Map<String, LongHolder> callsDroppedPerToken = new HashMap<String, LongHolder>(1);
68   @SuppressWarnings("unused")
69   private volatile long callsFailedToSend;
70   @SuppressWarnings("unused")
71   private volatile long callsFinishedKnownReceived;
72 
GrpclbClientLoadRecorder(TimeProvider time)73   GrpclbClientLoadRecorder(TimeProvider time) {
74     this.time = checkNotNull(time, "time provider");
75   }
76 
77   @Override
newClientStreamTracer(CallOptions callOptions, Metadata headers)78   public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) {
79     callsStartedUpdater.getAndIncrement(this);
80     return new StreamTracer();
81   }
82 
83   /**
84    * Records that a request has been dropped as instructed by the remote balancer.
85    */
recordDroppedRequest(String token)86   void recordDroppedRequest(String token) {
87     callsStartedUpdater.getAndIncrement(this);
88     callsFinishedUpdater.getAndIncrement(this);
89 
90     synchronized (this) {
91       LongHolder holder;
92       if ((holder = callsDroppedPerToken.get(token)) == null) {
93         callsDroppedPerToken.put(token, (holder = new LongHolder()));
94       }
95       holder.num++;
96     }
97   }
98 
99   /**
100    * Generate the report with the data recorded this LB stream since the last report.
101    */
generateLoadReport()102   ClientStats generateLoadReport() {
103     ClientStats.Builder statsBuilder =
104         ClientStats.newBuilder()
105         .setTimestamp(Timestamps.fromNanos(time.currentTimeNanos()))
106         .setNumCallsStarted(callsStartedUpdater.getAndSet(this, 0))
107         .setNumCallsFinished(callsFinishedUpdater.getAndSet(this, 0))
108         .setNumCallsFinishedWithClientFailedToSend(callsFailedToSendUpdater.getAndSet(this, 0))
109         .setNumCallsFinishedKnownReceived(callsFinishedKnownReceivedUpdater.getAndSet(this, 0));
110 
111     Map<String, LongHolder> localCallsDroppedPerToken = Collections.emptyMap();
112     synchronized (this) {
113       if (!callsDroppedPerToken.isEmpty()) {
114         localCallsDroppedPerToken = callsDroppedPerToken;
115         callsDroppedPerToken = new HashMap<String, LongHolder>(localCallsDroppedPerToken.size());
116       }
117     }
118     for (Entry<String, LongHolder> entry : localCallsDroppedPerToken.entrySet()) {
119       statsBuilder.addCallsFinishedWithDrop(
120           ClientStatsPerToken.newBuilder()
121               .setLoadBalanceToken(entry.getKey())
122               .setNumCalls(entry.getValue().num)
123               .build());
124     }
125     return statsBuilder.build();
126   }
127 
128   private class StreamTracer extends ClientStreamTracer {
129     private volatile boolean headersSent;
130     private volatile boolean anythingReceived;
131 
132     @Override
outboundHeaders()133     public void outboundHeaders() {
134       headersSent = true;
135     }
136 
137     @Override
inboundHeaders()138     public void inboundHeaders() {
139       anythingReceived = true;
140     }
141 
142     @Override
inboundMessage(int seqNo)143     public void inboundMessage(int seqNo) {
144       anythingReceived = true;
145     }
146 
147     @Override
streamClosed(Status status)148     public void streamClosed(Status status) {
149       callsFinishedUpdater.getAndIncrement(GrpclbClientLoadRecorder.this);
150       if (!headersSent) {
151         callsFailedToSendUpdater.getAndIncrement(GrpclbClientLoadRecorder.this);
152       }
153       if (anythingReceived) {
154         callsFinishedKnownReceivedUpdater.getAndIncrement(GrpclbClientLoadRecorder.this);
155       }
156     }
157   }
158 }
159