1// Copyright 2015 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5// Package timeseries implements a time series structure for stats collection.
6package timeseries
7
8import (
9	"fmt"
10	"log"
11	"time"
12)
13
14const (
15	timeSeriesNumBuckets       = 64
16	minuteHourSeriesNumBuckets = 60
17)
18
19var timeSeriesResolutions = []time.Duration{
20	1 * time.Second,
21	10 * time.Second,
22	1 * time.Minute,
23	10 * time.Minute,
24	1 * time.Hour,
25	6 * time.Hour,
26	24 * time.Hour,          // 1 day
27	7 * 24 * time.Hour,      // 1 week
28	4 * 7 * 24 * time.Hour,  // 4 weeks
29	16 * 7 * 24 * time.Hour, // 16 weeks
30}
31
32var minuteHourSeriesResolutions = []time.Duration{
33	1 * time.Second,
34	1 * time.Minute,
35}
36
37// An Observable is a kind of data that can be aggregated in a time series.
38type Observable interface {
39	Multiply(ratio float64)    // Multiplies the data in self by a given ratio
40	Add(other Observable)      // Adds the data from a different observation to self
41	Clear()                    // Clears the observation so it can be reused.
42	CopyFrom(other Observable) // Copies the contents of a given observation to self
43}
44
45// Float attaches the methods of Observable to a float64.
46type Float float64
47
48// NewFloat returns a Float.
49func NewFloat() Observable {
50	f := Float(0)
51	return &f
52}
53
54// String returns the float as a string.
55func (f *Float) String() string { return fmt.Sprintf("%g", f.Value()) }
56
57// Value returns the float's value.
58func (f *Float) Value() float64 { return float64(*f) }
59
60func (f *Float) Multiply(ratio float64) { *f *= Float(ratio) }
61
62func (f *Float) Add(other Observable) {
63	o := other.(*Float)
64	*f += *o
65}
66
67func (f *Float) Clear() { *f = 0 }
68
69func (f *Float) CopyFrom(other Observable) {
70	o := other.(*Float)
71	*f = *o
72}
73
74// A Clock tells the current time.
75type Clock interface {
76	Time() time.Time
77}
78
79type defaultClock int
80
81var defaultClockInstance defaultClock
82
83func (defaultClock) Time() time.Time { return time.Now() }
84
85// Information kept per level. Each level consists of a circular list of
86// observations. The start of the level may be derived from end and the
87// len(buckets) * sizeInMillis.
88type tsLevel struct {
89	oldest   int               // index to oldest bucketed Observable
90	newest   int               // index to newest bucketed Observable
91	end      time.Time         // end timestamp for this level
92	size     time.Duration     // duration of the bucketed Observable
93	buckets  []Observable      // collections of observations
94	provider func() Observable // used for creating new Observable
95}
96
97func (l *tsLevel) Clear() {
98	l.oldest = 0
99	l.newest = len(l.buckets) - 1
100	l.end = time.Time{}
101	for i := range l.buckets {
102		if l.buckets[i] != nil {
103			l.buckets[i].Clear()
104			l.buckets[i] = nil
105		}
106	}
107}
108
109func (l *tsLevel) InitLevel(size time.Duration, numBuckets int, f func() Observable) {
110	l.size = size
111	l.provider = f
112	l.buckets = make([]Observable, numBuckets)
113}
114
115// Keeps a sequence of levels. Each level is responsible for storing data at
116// a given resolution. For example, the first level stores data at a one
117// minute resolution while the second level stores data at a one hour
118// resolution.
119
120// Each level is represented by a sequence of buckets. Each bucket spans an
121// interval equal to the resolution of the level. New observations are added
122// to the last bucket.
123type timeSeries struct {
124	provider    func() Observable // make more Observable
125	numBuckets  int               // number of buckets in each level
126	levels      []*tsLevel        // levels of bucketed Observable
127	lastAdd     time.Time         // time of last Observable tracked
128	total       Observable        // convenient aggregation of all Observable
129	clock       Clock             // Clock for getting current time
130	pending     Observable        // observations not yet bucketed
131	pendingTime time.Time         // what time are we keeping in pending
132	dirty       bool              // if there are pending observations
133}
134
135// init initializes a level according to the supplied criteria.
136func (ts *timeSeries) init(resolutions []time.Duration, f func() Observable, numBuckets int, clock Clock) {
137	ts.provider = f
138	ts.numBuckets = numBuckets
139	ts.clock = clock
140	ts.levels = make([]*tsLevel, len(resolutions))
141
142	for i := range resolutions {
143		if i > 0 && resolutions[i-1] >= resolutions[i] {
144			log.Print("timeseries: resolutions must be monotonically increasing")
145			break
146		}
147		newLevel := new(tsLevel)
148		newLevel.InitLevel(resolutions[i], ts.numBuckets, ts.provider)
149		ts.levels[i] = newLevel
150	}
151
152	ts.Clear()
153}
154
155// Clear removes all observations from the time series.
156func (ts *timeSeries) Clear() {
157	ts.lastAdd = time.Time{}
158	ts.total = ts.resetObservation(ts.total)
159	ts.pending = ts.resetObservation(ts.pending)
160	ts.pendingTime = time.Time{}
161	ts.dirty = false
162
163	for i := range ts.levels {
164		ts.levels[i].Clear()
165	}
166}
167
168// Add records an observation at the current time.
169func (ts *timeSeries) Add(observation Observable) {
170	ts.AddWithTime(observation, ts.clock.Time())
171}
172
173// AddWithTime records an observation at the specified time.
174func (ts *timeSeries) AddWithTime(observation Observable, t time.Time) {
175
176	smallBucketDuration := ts.levels[0].size
177
178	if t.After(ts.lastAdd) {
179		ts.lastAdd = t
180	}
181
182	if t.After(ts.pendingTime) {
183		ts.advance(t)
184		ts.mergePendingUpdates()
185		ts.pendingTime = ts.levels[0].end
186		ts.pending.CopyFrom(observation)
187		ts.dirty = true
188	} else if t.After(ts.pendingTime.Add(-1 * smallBucketDuration)) {
189		// The observation is close enough to go into the pending bucket.
190		// This compensates for clock skewing and small scheduling delays
191		// by letting the update stay in the fast path.
192		ts.pending.Add(observation)
193		ts.dirty = true
194	} else {
195		ts.mergeValue(observation, t)
196	}
197}
198
199// mergeValue inserts the observation at the specified time in the past into all levels.
200func (ts *timeSeries) mergeValue(observation Observable, t time.Time) {
201	for _, level := range ts.levels {
202		index := (ts.numBuckets - 1) - int(level.end.Sub(t)/level.size)
203		if 0 <= index && index < ts.numBuckets {
204			bucketNumber := (level.oldest + index) % ts.numBuckets
205			if level.buckets[bucketNumber] == nil {
206				level.buckets[bucketNumber] = level.provider()
207			}
208			level.buckets[bucketNumber].Add(observation)
209		}
210	}
211	ts.total.Add(observation)
212}
213
214// mergePendingUpdates applies the pending updates into all levels.
215func (ts *timeSeries) mergePendingUpdates() {
216	if ts.dirty {
217		ts.mergeValue(ts.pending, ts.pendingTime)
218		ts.pending = ts.resetObservation(ts.pending)
219		ts.dirty = false
220	}
221}
222
223// advance cycles the buckets at each level until the latest bucket in
224// each level can hold the time specified.
225func (ts *timeSeries) advance(t time.Time) {
226	if !t.After(ts.levels[0].end) {
227		return
228	}
229	for i := 0; i < len(ts.levels); i++ {
230		level := ts.levels[i]
231		if !level.end.Before(t) {
232			break
233		}
234
235		// If the time is sufficiently far, just clear the level and advance
236		// directly.
237		if !t.Before(level.end.Add(level.size * time.Duration(ts.numBuckets))) {
238			for _, b := range level.buckets {
239				ts.resetObservation(b)
240			}
241			level.end = time.Unix(0, (t.UnixNano()/level.size.Nanoseconds())*level.size.Nanoseconds())
242		}
243
244		for t.After(level.end) {
245			level.end = level.end.Add(level.size)
246			level.newest = level.oldest
247			level.oldest = (level.oldest + 1) % ts.numBuckets
248			ts.resetObservation(level.buckets[level.newest])
249		}
250
251		t = level.end
252	}
253}
254
255// Latest returns the sum of the num latest buckets from the level.
256func (ts *timeSeries) Latest(level, num int) Observable {
257	now := ts.clock.Time()
258	if ts.levels[0].end.Before(now) {
259		ts.advance(now)
260	}
261
262	ts.mergePendingUpdates()
263
264	result := ts.provider()
265	l := ts.levels[level]
266	index := l.newest
267
268	for i := 0; i < num; i++ {
269		if l.buckets[index] != nil {
270			result.Add(l.buckets[index])
271		}
272		if index == 0 {
273			index = ts.numBuckets
274		}
275		index--
276	}
277
278	return result
279}
280
281// LatestBuckets returns a copy of the num latest buckets from level.
282func (ts *timeSeries) LatestBuckets(level, num int) []Observable {
283	if level < 0 || level > len(ts.levels) {
284		log.Print("timeseries: bad level argument: ", level)
285		return nil
286	}
287	if num < 0 || num >= ts.numBuckets {
288		log.Print("timeseries: bad num argument: ", num)
289		return nil
290	}
291
292	results := make([]Observable, num)
293	now := ts.clock.Time()
294	if ts.levels[0].end.Before(now) {
295		ts.advance(now)
296	}
297
298	ts.mergePendingUpdates()
299
300	l := ts.levels[level]
301	index := l.newest
302
303	for i := 0; i < num; i++ {
304		result := ts.provider()
305		results[i] = result
306		if l.buckets[index] != nil {
307			result.CopyFrom(l.buckets[index])
308		}
309
310		if index == 0 {
311			index = ts.numBuckets
312		}
313		index -= 1
314	}
315	return results
316}
317
318// ScaleBy updates observations by scaling by factor.
319func (ts *timeSeries) ScaleBy(factor float64) {
320	for _, l := range ts.levels {
321		for i := 0; i < ts.numBuckets; i++ {
322			l.buckets[i].Multiply(factor)
323		}
324	}
325
326	ts.total.Multiply(factor)
327	ts.pending.Multiply(factor)
328}
329
330// Range returns the sum of observations added over the specified time range.
331// If start or finish times don't fall on bucket boundaries of the same
332// level, then return values are approximate answers.
333func (ts *timeSeries) Range(start, finish time.Time) Observable {
334	return ts.ComputeRange(start, finish, 1)[0]
335}
336
337// Recent returns the sum of observations from the last delta.
338func (ts *timeSeries) Recent(delta time.Duration) Observable {
339	now := ts.clock.Time()
340	return ts.Range(now.Add(-delta), now)
341}
342
343// Total returns the total of all observations.
344func (ts *timeSeries) Total() Observable {
345	ts.mergePendingUpdates()
346	return ts.total
347}
348
349// ComputeRange computes a specified number of values into a slice using
350// the observations recorded over the specified time period. The return
351// values are approximate if the start or finish times don't fall on the
352// bucket boundaries at the same level or if the number of buckets spanning
353// the range is not an integral multiple of num.
354func (ts *timeSeries) ComputeRange(start, finish time.Time, num int) []Observable {
355	if start.After(finish) {
356		log.Printf("timeseries: start > finish, %v>%v", start, finish)
357		return nil
358	}
359
360	if num < 0 {
361		log.Printf("timeseries: num < 0, %v", num)
362		return nil
363	}
364
365	results := make([]Observable, num)
366
367	for _, l := range ts.levels {
368		if !start.Before(l.end.Add(-l.size * time.Duration(ts.numBuckets))) {
369			ts.extract(l, start, finish, num, results)
370			return results
371		}
372	}
373
374	// Failed to find a level that covers the desired range. So just
375	// extract from the last level, even if it doesn't cover the entire
376	// desired range.
377	ts.extract(ts.levels[len(ts.levels)-1], start, finish, num, results)
378
379	return results
380}
381
382// RecentList returns the specified number of values in slice over the most
383// recent time period of the specified range.
384func (ts *timeSeries) RecentList(delta time.Duration, num int) []Observable {
385	if delta < 0 {
386		return nil
387	}
388	now := ts.clock.Time()
389	return ts.ComputeRange(now.Add(-delta), now, num)
390}
391
392// extract returns a slice of specified number of observations from a given
393// level over a given range.
394func (ts *timeSeries) extract(l *tsLevel, start, finish time.Time, num int, results []Observable) {
395	ts.mergePendingUpdates()
396
397	srcInterval := l.size
398	dstInterval := finish.Sub(start) / time.Duration(num)
399	dstStart := start
400	srcStart := l.end.Add(-srcInterval * time.Duration(ts.numBuckets))
401
402	srcIndex := 0
403
404	// Where should scanning start?
405	if dstStart.After(srcStart) {
406		advance := dstStart.Sub(srcStart) / srcInterval
407		srcIndex += int(advance)
408		srcStart = srcStart.Add(advance * srcInterval)
409	}
410
411	// The i'th value is computed as show below.
412	// interval = (finish/start)/num
413	// i'th value = sum of observation in range
414	//   [ start + i       * interval,
415	//     start + (i + 1) * interval )
416	for i := 0; i < num; i++ {
417		results[i] = ts.resetObservation(results[i])
418		dstEnd := dstStart.Add(dstInterval)
419		for srcIndex < ts.numBuckets && srcStart.Before(dstEnd) {
420			srcEnd := srcStart.Add(srcInterval)
421			if srcEnd.After(ts.lastAdd) {
422				srcEnd = ts.lastAdd
423			}
424
425			if !srcEnd.Before(dstStart) {
426				srcValue := l.buckets[(srcIndex+l.oldest)%ts.numBuckets]
427				if !srcStart.Before(dstStart) && !srcEnd.After(dstEnd) {
428					// dst completely contains src.
429					if srcValue != nil {
430						results[i].Add(srcValue)
431					}
432				} else {
433					// dst partially overlaps src.
434					overlapStart := maxTime(srcStart, dstStart)
435					overlapEnd := minTime(srcEnd, dstEnd)
436					base := srcEnd.Sub(srcStart)
437					fraction := overlapEnd.Sub(overlapStart).Seconds() / base.Seconds()
438
439					used := ts.provider()
440					if srcValue != nil {
441						used.CopyFrom(srcValue)
442					}
443					used.Multiply(fraction)
444					results[i].Add(used)
445				}
446
447				if srcEnd.After(dstEnd) {
448					break
449				}
450			}
451			srcIndex++
452			srcStart = srcStart.Add(srcInterval)
453		}
454		dstStart = dstStart.Add(dstInterval)
455	}
456}
457
458// resetObservation clears the content so the struct may be reused.
459func (ts *timeSeries) resetObservation(observation Observable) Observable {
460	if observation == nil {
461		observation = ts.provider()
462	} else {
463		observation.Clear()
464	}
465	return observation
466}
467
468// TimeSeries tracks data at granularities from 1 second to 16 weeks.
469type TimeSeries struct {
470	timeSeries
471}
472
473// NewTimeSeries creates a new TimeSeries using the function provided for creating new Observable.
474func NewTimeSeries(f func() Observable) *TimeSeries {
475	return NewTimeSeriesWithClock(f, defaultClockInstance)
476}
477
478// NewTimeSeriesWithClock creates a new TimeSeries using the function provided for creating new Observable and the clock for
479// assigning timestamps.
480func NewTimeSeriesWithClock(f func() Observable, clock Clock) *TimeSeries {
481	ts := new(TimeSeries)
482	ts.timeSeries.init(timeSeriesResolutions, f, timeSeriesNumBuckets, clock)
483	return ts
484}
485
486// MinuteHourSeries tracks data at granularities of 1 minute and 1 hour.
487type MinuteHourSeries struct {
488	timeSeries
489}
490
491// NewMinuteHourSeries creates a new MinuteHourSeries using the function provided for creating new Observable.
492func NewMinuteHourSeries(f func() Observable) *MinuteHourSeries {
493	return NewMinuteHourSeriesWithClock(f, defaultClockInstance)
494}
495
496// NewMinuteHourSeriesWithClock creates a new MinuteHourSeries using the function provided for creating new Observable and the clock for
497// assigning timestamps.
498func NewMinuteHourSeriesWithClock(f func() Observable, clock Clock) *MinuteHourSeries {
499	ts := new(MinuteHourSeries)
500	ts.timeSeries.init(minuteHourSeriesResolutions, f,
501		minuteHourSeriesNumBuckets, clock)
502	return ts
503}
504
505func (ts *MinuteHourSeries) Minute() Observable {
506	return ts.timeSeries.Latest(0, 60)
507}
508
509func (ts *MinuteHourSeries) Hour() Observable {
510	return ts.timeSeries.Latest(1, 60)
511}
512
513func minTime(a, b time.Time) time.Time {
514	if a.Before(b) {
515		return a
516	}
517	return b
518}
519
520func maxTime(a, b time.Time) time.Time {
521	if a.After(b) {
522		return a
523	}
524	return b
525}
526