• Home
  • History
  • Annotate
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2016 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <stdint.h>
19 #include <unistd.h>
20 #include <time.h>
21 
22 #include "common/libs/time/monotonic_time.h"
23 
24 /**
25  * This abstract class simulates a buffer that either fills or empties at
26  * a specified rate.
27  *
28  * The simulated buffer automatically fills or empties at a specific rate.
29  *
30  * An item is the thing contained in the simulated buffer. Items are moved
31  * in and out of the buffer without subdivision.
32  *
33  * An integral number of items must arrive / depart in each second.
34  * This number is stored in items_per_second_
35  *
36  * items_per_second * 2000000000 must fit within an int64_t. This
37  * works if items_per_second is represented by an int32.
38  *
39  * The base class does have the concept of capacity, but doesn't use it.
40  * It is included here to simplify unit testing.
41  *
42  * For actual use, see SimulatedInputBuffer and SimulatedOutputBuffer below.
43  */
44 class SimulatedBufferBase {
45  public:
46   static inline int64_t divide_and_round_up(int64_t q, int64_t d) {
47     return q / d + ((q % d) != 0);
48   }
49 
50   SimulatedBufferBase(
51       int32_t items_per_second,
52       int64_t simulated_item_capacity,
53       cvd::time::MonotonicTimePointFactory* clock =
54         cvd::time::MonotonicTimePointFactory::GetInstance()) :
55     clock_(clock),
56     current_item_num_(0),
57     base_item_num_(0),
58     simulated_item_capacity_(simulated_item_capacity),
59     items_per_second_(items_per_second),
60     initialize_(true),
61     paused_(false) { }
62 
63   virtual ~SimulatedBufferBase() { }
64 
65   int64_t GetCurrentItemNum() {
66     Update();
67     return current_item_num_;
68   }
69 
70   const cvd::time::MonotonicTimePoint GetLastUpdatedTime() const {
71     return current_time_;
72   }
73 
74   // Sleep for the given amount of time. Subclasses may override this to use
75   // different sleep calls.
76   // Sleep is best-effort. The code assumes that the acutal sleep time may be
77   // greater or less than the time requested.
78   virtual void SleepUntilTime(const cvd::time::MonotonicTimePoint& in) {
79     struct timespec ts;
80     in.ToTimespec(&ts);
81     clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &ts, NULL);
82   }
83 
84   // The time counter may not start at 0. Concrete classes should call this
85   // to allow the buffer simulation to read the current time number and
86   // initialize its internal state.
87   virtual void Init() {
88     if (initialize_) {
89       clock_->FetchCurrentTime(&base_time_);
90       current_time_ = base_time_;
91       initialize_ = false;
92     }
93   }
94 
95   virtual void Update() {
96     if (initialize_) {
97       Init();
98     }
99     cvd::time::MonotonicTimePoint now;
100     clock_->FetchCurrentTime(&now);
101     // We can't call FetchCurrentTime() in the constuctor because a subclass may
102     // want to override it, so we initialze the times to 0. If we detect this
103     // case go ahead and initialize to a current timestamp.
104     if (paused_) {
105       base_time_ += now - current_time_;
106       current_time_ = now;
107       return;
108     }
109     // Avoid potential overflow by limiting the scaling to one time second.
110     // There is no round-off error here because the bases are adjusted for full
111     // seconds.
112     // There is no issue with int64 overflow because 2's compliment subtraction
113     // is immune to overflow.
114     // However, this does assume that kNanosecondsPerSecond * items_per_second_
115     // fits in an int64.
116     cvd::time::Seconds seconds(now - base_time_);
117     base_time_ += seconds;
118     base_item_num_ += seconds.count() * items_per_second_;
119     current_time_ = now;
120     current_item_num_ =
121         cvd::time::Nanoseconds(now - base_time_).count() *
122         items_per_second_ / cvd::time::kNanosecondsPerSecond +
123         base_item_num_;
124   }
125 
126   // If set to true new items will not be created.
127   bool SetPaused(bool new_state) {
128     bool rval = paused_;
129     Update();
130     paused_ = new_state;
131     return rval;
132   }
133 
134   // Calculate the TimePoint that corresponds to an item.
135   // Caution: This may not return a correct time for items in the past.
136   cvd::time::MonotonicTimePoint CalculateItemTime(int64_t item) {
137     int64_t seconds = (item - base_item_num_) / items_per_second_;
138     int64_t new_base_item_num = base_item_num_ + seconds * items_per_second_;
139     return base_time_ + cvd::time::Seconds(seconds) +
140       cvd::time::Nanoseconds(divide_and_round_up(
141           (item - new_base_item_num) *
142           cvd::time::kNanosecondsPerSecond,
143           items_per_second_));
144   }
145 
146   // Sleep until the given item number is generated. If the generator is
147   // paused unpause it to make the sleep finite.
148   void SleepUntilItem(int64_t item) {
149     if (paused_) {
150       SetPaused(false);
151     }
152     cvd::time::MonotonicTimePoint desired_time =
153         CalculateItemTime(item);
154     while (1) {
155       Update();
156       if (current_item_num_ - item >= 0) {
157         return;
158       }
159       SleepUntilTime(desired_time);
160     }
161   }
162 
163  protected:
164   // Source of the timepoints.
165   cvd::time::MonotonicTimePointFactory* clock_;
166   // Time when the other values in the structure were updated.
167   cvd::time::MonotonicTimePoint current_time_;
168   // Most recent time when there was no round-off error between the clock and
169   // items.
170   cvd::time::MonotonicTimePoint base_time_;
171   // Number of the current item.
172   int64_t current_item_num_;
173   // Most recent item number where there was no round-off error between the
174   // clock and items.
175   int64_t base_item_num_;
176   // Simulated_Item_Capacity of the buffer in items.
177   int64_t simulated_item_capacity_;
178   // Number of items that are created in 1s. A typical number would be 48000.
179   int32_t items_per_second_;
180   bool initialize_;
181   // If true then don't generate new items.
182   bool paused_;
183 };
184 
185 /**
186  * This is a simulation of an output buffer that drains at a constant rate.
187  */
188 class SimulatedOutputBuffer : public SimulatedBufferBase {
189  public:
190   SimulatedOutputBuffer(
191       int64_t item_rate,
192       int64_t simulated_item_capacity,
193       cvd::time::MonotonicTimePointFactory* clock =
194         cvd::time::MonotonicTimePointFactory::GetInstance()) :
195       SimulatedBufferBase(item_rate, simulated_item_capacity, clock) {
196     output_buffer_item_num_ = current_item_num_;
197   }
198 
199   void Update() override {
200     SimulatedBufferBase::Update();
201     if ((output_buffer_item_num_ - current_item_num_) < 0) {
202       // We ran out of items at some point in the past. However, the
203       // output capactiy can't be negative.
204       output_buffer_item_num_ = current_item_num_;
205     }
206   }
207 
208   int64_t AddToOutputBuffer(int64_t num_new_items, bool block) {
209     Update();
210     // The easy case: num_new_items fit in the bucket.
211     if ((output_buffer_item_num_ + num_new_items - current_item_num_) <=
212         simulated_item_capacity_) {
213       output_buffer_item_num_ += num_new_items;
214       return num_new_items;
215     }
216     // If we're non-blocking accept enough items to fill the output.
217     if (!block) {
218       int64_t used = current_item_num_ + simulated_item_capacity_ -
219           output_buffer_item_num_;
220       output_buffer_item_num_ = current_item_num_ + simulated_item_capacity_;
221       return used;
222     }
223     int64_t new_output_buffer_item_num = output_buffer_item_num_ + num_new_items;
224     SleepUntilItem(new_output_buffer_item_num - simulated_item_capacity_);
225     output_buffer_item_num_ = new_output_buffer_item_num;
226     return num_new_items;
227   }
228 
229   int64_t GetNextOutputBufferItemNum() {
230     Update();
231     return output_buffer_item_num_;
232   }
233 
234   cvd::time::MonotonicTimePoint GetNextOutputBufferItemTime() {
235     Update();
236     return CalculateItemTime(output_buffer_item_num_);
237   }
238 
239   int64_t GetOutputBufferSize() {
240     Update();
241     return output_buffer_item_num_ - current_item_num_;
242   }
243 
244   void Drain() {
245     SleepUntilItem(output_buffer_item_num_);
246   }
247 
248  protected:
249   int64_t output_buffer_item_num_;
250 };
251 
252 /**
253  * Simulates an input buffer that fills at a constant rate.
254  */
255 class SimulatedInputBuffer : public SimulatedBufferBase {
256  public:
257   SimulatedInputBuffer(
258       int64_t item_rate,
259       int64_t simulated_item_capacity,
260       cvd::time::MonotonicTimePointFactory* clock =
261         cvd::time::MonotonicTimePointFactory::GetInstance()) :
262       SimulatedBufferBase(item_rate, simulated_item_capacity, clock) {
263     input_buffer_item_num_ = current_item_num_;
264     lost_input_items_ = 0;
265   }
266 
267   void Update() override {
268     SimulatedBufferBase::Update();
269     if ((current_item_num_ - input_buffer_item_num_) >
270         simulated_item_capacity_) {
271       // The buffer overflowed at some point in the past. Account for the lost
272       // times.
273       int64_t new_input_buffer_item_num =
274           current_item_num_ - simulated_item_capacity_;
275       lost_input_items_ +=
276           new_input_buffer_item_num - input_buffer_item_num_;
277       input_buffer_item_num_ = new_input_buffer_item_num;
278     }
279   }
280 
281   int64_t RemoveFromInputBuffer(int64_t num_items_wanted, bool block) {
282     Update();
283     if (!block) {
284       int64_t num_items_available = current_item_num_ - input_buffer_item_num_;
285       if (num_items_available < num_items_wanted) {
286         input_buffer_item_num_ += num_items_available;
287         return num_items_available;
288       } else {
289         input_buffer_item_num_ += num_items_wanted;
290         return num_items_wanted;
291       }
292     }
293     // Calculate the item number that is being claimed. Sleep until it appears.
294     // Advancing input_buffer_item_num_ causes a negative value to be compared
295     // to the capacity, effectively disabling the overflow detection code
296     // in Update().
297     input_buffer_item_num_ += num_items_wanted;
298     while (input_buffer_item_num_ - current_item_num_ > 0) {
299       SleepUntilItem(input_buffer_item_num_);
300     }
301     return num_items_wanted;
302   }
303 
304   int64_t GetLostInputItems() {
305     Update();
306     int64_t rval = lost_input_items_;
307     lost_input_items_ = 0;
308     return rval;
309   }
310 
311  protected:
312   int64_t input_buffer_item_num_;
313   int64_t lost_input_items_;
314 };
315