1 /*
2  * Copyright 2017, OpenCensus Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.opencensus.implcore.stats;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static io.opencensus.implcore.stats.RecordUtils.createAggregationMap;
21 import static io.opencensus.implcore.stats.RecordUtils.createMutableAggregation;
22 import static io.opencensus.implcore.stats.RecordUtils.getTagMap;
23 import static io.opencensus.implcore.stats.RecordUtils.getTagValues;
24 
25 import com.google.common.annotations.VisibleForTesting;
26 import com.google.common.collect.LinkedHashMultimap;
27 import com.google.common.collect.Maps;
28 import com.google.common.collect.Multimap;
29 import io.opencensus.common.Duration;
30 import io.opencensus.common.Function;
31 import io.opencensus.common.Functions;
32 import io.opencensus.common.Timestamp;
33 import io.opencensus.implcore.internal.CheckerFrameworkUtils;
34 import io.opencensus.implcore.internal.CurrentState.State;
35 import io.opencensus.metrics.LabelValue;
36 import io.opencensus.metrics.export.Metric;
37 import io.opencensus.metrics.export.MetricDescriptor;
38 import io.opencensus.metrics.export.MetricDescriptor.Type;
39 import io.opencensus.metrics.export.Point;
40 import io.opencensus.metrics.export.TimeSeries;
41 import io.opencensus.stats.Aggregation;
42 import io.opencensus.stats.AggregationData;
43 import io.opencensus.stats.Measure;
44 import io.opencensus.stats.View;
45 import io.opencensus.stats.ViewData;
46 import io.opencensus.tags.TagContext;
47 import io.opencensus.tags.TagValue;
48 import java.util.ArrayDeque;
49 import java.util.ArrayList;
50 import java.util.Collections;
51 import java.util.List;
52 import java.util.Map;
53 import java.util.Map.Entry;
54 
55 /*>>>
56 import org.checkerframework.checker.nullness.qual.Nullable;
57 */
58 
59 /** A mutable version of {@link ViewData}, used for recording stats and start/end time. */
60 @SuppressWarnings("deprecation")
61 abstract class MutableViewData {
62 
63   @VisibleForTesting static final Timestamp ZERO_TIMESTAMP = Timestamp.create(0, 0);
64 
65   private final View view;
66 
MutableViewData(View view)67   private MutableViewData(View view) {
68     this.view = view;
69   }
70 
71   /**
72    * Constructs a new {@link MutableViewData}.
73    *
74    * @param view the {@code View} linked with this {@code MutableViewData}.
75    * @param start the start {@code Timestamp}.
76    * @return a {@code MutableViewData}.
77    */
create(final View view, final Timestamp start)78   static MutableViewData create(final View view, final Timestamp start) {
79     return view.getWindow()
80         .match(
81             new CreateCumulative(view, start),
82             new CreateInterval(view, start),
83             Functions.<MutableViewData>throwAssertionError());
84   }
85 
86   /** The {@link View} associated with this {@link ViewData}. */
getView()87   View getView() {
88     return view;
89   }
90 
91   @javax.annotation.Nullable
toMetric(Timestamp now, State state)92   abstract Metric toMetric(Timestamp now, State state);
93 
94   /** Record stats with the given tags. */
record( TagContext context, double value, Timestamp timestamp, Map<String, String> attachments)95   abstract void record(
96       TagContext context, double value, Timestamp timestamp, Map<String, String> attachments);
97 
98   /** Convert this {@link MutableViewData} to {@link ViewData}. */
toViewData(Timestamp now, State state)99   abstract ViewData toViewData(Timestamp now, State state);
100 
101   // Clear recorded stats.
clearStats()102   abstract void clearStats();
103 
104   // Resume stats collection, and reset Start Timestamp (for CumulativeMutableViewData), or refresh
105   // bucket list (for InternalMutableViewData).
resumeStatsCollection(Timestamp now)106   abstract void resumeStatsCollection(Timestamp now);
107 
108   private static final class CumulativeMutableViewData extends MutableViewData {
109 
110     private Timestamp start;
111     private final Map<List</*@Nullable*/ TagValue>, MutableAggregation> tagValueAggregationMap =
112         Maps.newHashMap();
113     // Cache a MetricDescriptor to avoid converting View to MetricDescriptor in the future.
114     private final MetricDescriptor metricDescriptor;
115 
CumulativeMutableViewData(View view, Timestamp start)116     private CumulativeMutableViewData(View view, Timestamp start) {
117       super(view);
118       this.start = start;
119       MetricDescriptor metricDescriptor = MetricUtils.viewToMetricDescriptor(view);
120       if (metricDescriptor == null) {
121         throw new AssertionError(
122             "Cumulative view should be converted to a non-null MetricDescriptor.");
123       } else {
124         this.metricDescriptor = metricDescriptor;
125       }
126     }
127 
128     @javax.annotation.Nullable
129     @Override
toMetric(Timestamp now, State state)130     Metric toMetric(Timestamp now, State state) {
131       if (state == State.DISABLED) {
132         return null;
133       }
134       Type type = metricDescriptor.getType();
135       @javax.annotation.Nullable
136       Timestamp startTime = type == Type.GAUGE_INT64 || type == Type.GAUGE_DOUBLE ? null : start;
137       List<TimeSeries> timeSeriesList = new ArrayList<TimeSeries>();
138       for (Entry<List</*@Nullable*/ TagValue>, MutableAggregation> entry :
139           tagValueAggregationMap.entrySet()) {
140         List<LabelValue> labelValues = MetricUtils.tagValuesToLabelValues(entry.getKey());
141         Point point = entry.getValue().toPoint(now);
142         timeSeriesList.add(TimeSeries.createWithOnePoint(labelValues, point, startTime));
143       }
144       return Metric.create(metricDescriptor, timeSeriesList);
145     }
146 
147     @Override
record( TagContext context, double value, Timestamp timestamp, Map<String, String> attachments)148     void record(
149         TagContext context, double value, Timestamp timestamp, Map<String, String> attachments) {
150       List</*@Nullable*/ TagValue> tagValues =
151           getTagValues(getTagMap(context), super.view.getColumns());
152       if (!tagValueAggregationMap.containsKey(tagValues)) {
153         tagValueAggregationMap.put(
154             tagValues,
155             createMutableAggregation(super.view.getAggregation(), super.getView().getMeasure()));
156       }
157       tagValueAggregationMap.get(tagValues).add(value, attachments, timestamp);
158     }
159 
160     @Override
toViewData(Timestamp now, State state)161     ViewData toViewData(Timestamp now, State state) {
162       if (state == State.ENABLED) {
163         return ViewData.create(
164             super.view,
165             createAggregationMap(tagValueAggregationMap, super.view.getMeasure()),
166             ViewData.AggregationWindowData.CumulativeData.create(start, now));
167       } else {
168         // If Stats state is DISABLED, return an empty ViewData.
169         return ViewData.create(
170             super.view,
171             Collections.<List</*@Nullable*/ TagValue>, AggregationData>emptyMap(),
172             ViewData.AggregationWindowData.CumulativeData.create(ZERO_TIMESTAMP, ZERO_TIMESTAMP));
173       }
174     }
175 
176     @Override
clearStats()177     void clearStats() {
178       tagValueAggregationMap.clear();
179     }
180 
181     @Override
resumeStatsCollection(Timestamp now)182     void resumeStatsCollection(Timestamp now) {
183       start = now;
184     }
185   }
186 
187   /*
188    * For each IntervalView, we always keep a queue of N + 1 buckets (by default N is 4).
189    * Each bucket has a duration which is interval duration / N.
190    * Ideally:
191    * 1. the buckets should always be up-to-date,
192    * 2. current time should always be within the latest bucket, currently recorded stats should fall
193    *    into the latest bucket,
194    * 3. there are always N buckets before the current one, which holds the stats in the past
195    *    interval duration.
196    *
197    * When getView() is called, we will extract and combine the stats from the current and past
198    * buckets (part of the stats from the oldest bucket could have expired).
199    *
200    * However, in reality, we couldn't track the status of buckets all the time (keep monitoring and
201    * updating the bucket queue will be expensive). When we call record() or getView(), some or all
202    * of the buckets might be outdated, and we will need to "pad" new buckets to the queue and remove
203    * outdated ones. After refreshing buckets, the bucket queue will able to maintain the three
204    * invariants in the ideal situation.
205    *
206    * For example:
207    * 1. We have an IntervalView which has a duration of 8 seconds, we register this view at 10s.
208    * 2. Initially there will be 5 buckets: [2.0, 4.0), [4.0, 6.0), ..., [10.0, 12.0).
209    * 3. If users don't call record() or getView(), bucket queue will remain as it is, and some
210    *    buckets could expire.
211    * 4. Suppose record() is called at 15s, now we need to refresh the bucket queue. We need to add
212    *    two new buckets [12.0, 14.0) and [14.0, 16.0), and remove two expired buckets [2.0, 4.0)
213    *    and [4.0, 6.0)
214    * 5. Suppose record() is called again at 30s, all the current buckets should have expired. We add
215    *    5 new buckets [22.0, 24.0) ... [30.0, 32.0) and remove all the previous buckets.
216    * 6. Suppose users call getView() at 35s, again we need to add two new buckets and remove two
217    *    expired one, so that bucket queue is up-to-date. Now we combine stats from all buckets and
218    *    return the combined IntervalViewData.
219    */
220   private static final class IntervalMutableViewData extends MutableViewData {
221 
222     // TODO(songya): allow customizable bucket size in the future.
223     private static final int N = 4; // IntervalView has N + 1 buckets
224 
225     private final ArrayDeque<IntervalBucket> buckets = new ArrayDeque<IntervalBucket>();
226 
227     private final Duration totalDuration; // Duration of the whole interval.
228     private final Duration bucketDuration; // Duration of a single bucket (totalDuration / N)
229 
IntervalMutableViewData(View view, Timestamp start)230     private IntervalMutableViewData(View view, Timestamp start) {
231       super(view);
232       Duration totalDuration = ((View.AggregationWindow.Interval) view.getWindow()).getDuration();
233       this.totalDuration = totalDuration;
234       this.bucketDuration = Duration.fromMillis(totalDuration.toMillis() / N);
235 
236       // When initializing. add N empty buckets prior to the start timestamp of this
237       // IntervalMutableViewData, so that the last bucket will be the current one in effect.
238       shiftBucketList(N + 1, start);
239     }
240 
241     @javax.annotation.Nullable
242     @Override
toMetric(Timestamp now, State state)243     Metric toMetric(Timestamp now, State state) {
244       return null;
245     }
246 
247     @Override
record( TagContext context, double value, Timestamp timestamp, Map<String, String> attachments)248     void record(
249         TagContext context, double value, Timestamp timestamp, Map<String, String> attachments) {
250       List</*@Nullable*/ TagValue> tagValues =
251           getTagValues(getTagMap(context), super.view.getColumns());
252       refreshBucketList(timestamp);
253       // It is always the last bucket that does the recording.
254       CheckerFrameworkUtils.castNonNull(buckets.peekLast())
255           .record(tagValues, value, attachments, timestamp);
256     }
257 
258     @Override
toViewData(Timestamp now, State state)259     ViewData toViewData(Timestamp now, State state) {
260       refreshBucketList(now);
261       if (state == State.ENABLED) {
262         return ViewData.create(
263             super.view,
264             combineBucketsAndGetAggregationMap(now),
265             ViewData.AggregationWindowData.IntervalData.create(now));
266       } else {
267         // If Stats state is DISABLED, return an empty ViewData.
268         return ViewData.create(
269             super.view,
270             Collections.<List</*@Nullable*/ TagValue>, AggregationData>emptyMap(),
271             ViewData.AggregationWindowData.IntervalData.create(ZERO_TIMESTAMP));
272       }
273     }
274 
275     @Override
clearStats()276     void clearStats() {
277       for (IntervalBucket bucket : buckets) {
278         bucket.clearStats();
279       }
280     }
281 
282     @Override
resumeStatsCollection(Timestamp now)283     void resumeStatsCollection(Timestamp now) {
284       // Refresh bucket list to be ready for stats recording, so that if record() is called right
285       // after stats state is turned back on, record() will be faster.
286       refreshBucketList(now);
287     }
288 
289     // Add new buckets and remove expired buckets by comparing the current timestamp with
290     // timestamp of the last bucket.
refreshBucketList(Timestamp now)291     private void refreshBucketList(Timestamp now) {
292       if (buckets.size() != N + 1) {
293         throw new AssertionError("Bucket list must have exactly " + (N + 1) + " buckets.");
294       }
295       Timestamp startOfLastBucket =
296           CheckerFrameworkUtils.castNonNull(buckets.peekLast()).getStart();
297       // TODO(songya): decide what to do when time goes backwards
298       checkArgument(
299           now.compareTo(startOfLastBucket) >= 0,
300           "Current time must be within or after the last bucket.");
301       long elapsedTimeMillis = now.subtractTimestamp(startOfLastBucket).toMillis();
302       long numOfPadBuckets = elapsedTimeMillis / bucketDuration.toMillis();
303 
304       shiftBucketList(numOfPadBuckets, now);
305     }
306 
307     // Add specified number of new buckets, and remove expired buckets
shiftBucketList(long numOfPadBuckets, Timestamp now)308     private void shiftBucketList(long numOfPadBuckets, Timestamp now) {
309       Timestamp startOfNewBucket;
310 
311       if (!buckets.isEmpty()) {
312         startOfNewBucket =
313             CheckerFrameworkUtils.castNonNull(buckets.peekLast())
314                 .getStart()
315                 .addDuration(bucketDuration);
316       } else {
317         // Initialize bucket list. Should only enter this block once.
318         startOfNewBucket = subtractDuration(now, totalDuration);
319       }
320 
321       if (numOfPadBuckets > N + 1) {
322         // All current buckets expired, need to add N + 1 new buckets. The start time of the latest
323         // bucket will be current time.
324         startOfNewBucket = subtractDuration(now, totalDuration);
325         numOfPadBuckets = N + 1;
326       }
327 
328       for (int i = 0; i < numOfPadBuckets; i++) {
329         buckets.add(
330             new IntervalBucket(
331                 startOfNewBucket,
332                 bucketDuration,
333                 super.view.getAggregation(),
334                 super.view.getMeasure()));
335         startOfNewBucket = startOfNewBucket.addDuration(bucketDuration);
336       }
337 
338       // removed expired buckets
339       while (buckets.size() > N + 1) {
340         buckets.pollFirst();
341       }
342     }
343 
344     // Combine stats within each bucket, aggregate stats by tag values, and return the mapping from
345     // tag values to aggregation data.
combineBucketsAndGetAggregationMap( Timestamp now)346     private Map<List</*@Nullable*/ TagValue>, AggregationData> combineBucketsAndGetAggregationMap(
347         Timestamp now) {
348       // Need to maintain the order of inserted MutableAggregations (inserted based on time order).
349       Multimap<List</*@Nullable*/ TagValue>, MutableAggregation> multimap =
350           LinkedHashMultimap.create();
351 
352       ArrayDeque<IntervalBucket> shallowCopy = new ArrayDeque<IntervalBucket>(buckets);
353 
354       Aggregation aggregation = super.view.getAggregation();
355       Measure measure = super.view.getMeasure();
356       putBucketsIntoMultiMap(shallowCopy, multimap, aggregation, measure, now);
357       Map<List</*@Nullable*/ TagValue>, MutableAggregation> singleMap =
358           aggregateOnEachTagValueList(multimap, aggregation, measure);
359       return createAggregationMap(singleMap, super.getView().getMeasure());
360     }
361 
362     // Put stats within each bucket to a multimap. Each tag value list (map key) could have multiple
363     // mutable aggregations (map value) from different buckets.
putBucketsIntoMultiMap( ArrayDeque<IntervalBucket> buckets, Multimap<List< TagValue>, MutableAggregation> multimap, Aggregation aggregation, Measure measure, Timestamp now)364     private static void putBucketsIntoMultiMap(
365         ArrayDeque<IntervalBucket> buckets,
366         Multimap<List</*@Nullable*/ TagValue>, MutableAggregation> multimap,
367         Aggregation aggregation,
368         Measure measure,
369         Timestamp now) {
370       // Put fractional stats of the head (oldest) bucket.
371       IntervalBucket head = CheckerFrameworkUtils.castNonNull(buckets.peekFirst());
372       IntervalBucket tail = CheckerFrameworkUtils.castNonNull(buckets.peekLast());
373       double fractionTail = tail.getFraction(now);
374       // TODO(songya): decide what to do when time goes backwards
375       checkArgument(
376           0.0 <= fractionTail && fractionTail <= 1.0,
377           "Fraction " + fractionTail + " should be within [0.0, 1.0].");
378       double fractionHead = 1.0 - fractionTail;
379       putFractionalMutableAggregationsToMultiMap(
380           head.getTagValueAggregationMap(), multimap, aggregation, measure, fractionHead);
381 
382       // Put whole data of other buckets.
383       boolean shouldSkipFirst = true;
384       for (IntervalBucket bucket : buckets) {
385         if (shouldSkipFirst) {
386           shouldSkipFirst = false;
387           continue; // skip the first bucket
388         }
389         for (Entry<List</*@Nullable*/ TagValue>, MutableAggregation> entry :
390             bucket.getTagValueAggregationMap().entrySet()) {
391           multimap.put(entry.getKey(), entry.getValue());
392         }
393       }
394     }
395 
396     // Put stats within one bucket into multimap, multiplied by a given fraction.
putFractionalMutableAggregationsToMultiMap( Map<T, MutableAggregation> mutableAggrMap, Multimap<T, MutableAggregation> multimap, Aggregation aggregation, Measure measure, double fraction)397     private static <T> void putFractionalMutableAggregationsToMultiMap(
398         Map<T, MutableAggregation> mutableAggrMap,
399         Multimap<T, MutableAggregation> multimap,
400         Aggregation aggregation,
401         Measure measure,
402         double fraction) {
403       for (Entry<T, MutableAggregation> entry : mutableAggrMap.entrySet()) {
404         // Initially empty MutableAggregations.
405         MutableAggregation fractionalMutableAgg = createMutableAggregation(aggregation, measure);
406         fractionalMutableAgg.combine(entry.getValue(), fraction);
407         multimap.put(entry.getKey(), fractionalMutableAgg);
408       }
409     }
410 
411     // For each tag value list (key of AggregationMap), combine mutable aggregations into one
412     // mutable aggregation, thus convert the multimap into a single map.
aggregateOnEachTagValueList( Multimap<T, MutableAggregation> multimap, Aggregation aggregation, Measure measure)413     private static <T> Map<T, MutableAggregation> aggregateOnEachTagValueList(
414         Multimap<T, MutableAggregation> multimap, Aggregation aggregation, Measure measure) {
415       Map<T, MutableAggregation> map = Maps.newHashMap();
416       for (T tagValues : multimap.keySet()) {
417         // Initially empty MutableAggregations.
418         MutableAggregation combinedAggregation = createMutableAggregation(aggregation, measure);
419         for (MutableAggregation mutableAggregation : multimap.get(tagValues)) {
420           combinedAggregation.combine(mutableAggregation, 1.0);
421         }
422         map.put(tagValues, combinedAggregation);
423       }
424       return map;
425     }
426 
427     // Subtract a Duration from a Timestamp, and return a new Timestamp.
subtractDuration(Timestamp timestamp, Duration duration)428     private static Timestamp subtractDuration(Timestamp timestamp, Duration duration) {
429       return timestamp.addDuration(Duration.create(-duration.getSeconds(), -duration.getNanos()));
430     }
431   }
432 
433   private static final class CreateCumulative
434       implements Function<View.AggregationWindow.Cumulative, MutableViewData> {
435     @Override
apply(View.AggregationWindow.Cumulative arg)436     public MutableViewData apply(View.AggregationWindow.Cumulative arg) {
437       return new CumulativeMutableViewData(view, start);
438     }
439 
440     private final View view;
441     private final Timestamp start;
442 
CreateCumulative(View view, Timestamp start)443     private CreateCumulative(View view, Timestamp start) {
444       this.view = view;
445       this.start = start;
446     }
447   }
448 
449   private static final class CreateInterval
450       implements Function<View.AggregationWindow.Interval, MutableViewData> {
451     @Override
apply(View.AggregationWindow.Interval arg)452     public MutableViewData apply(View.AggregationWindow.Interval arg) {
453       return new IntervalMutableViewData(view, start);
454     }
455 
456     private final View view;
457     private final Timestamp start;
458 
CreateInterval(View view, Timestamp start)459     private CreateInterval(View view, Timestamp start) {
460       this.view = view;
461       this.start = start;
462     }
463   }
464 }
465