1 /* 2 * Copyright (C) 2016 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #pragma once 17 18 #include <stdint.h> 19 #include <unistd.h> 20 #include <time.h> 21 22 #include "common/libs/time/monotonic_time.h" 23 24 /** 25 * This abstract class simulates a buffer that either fills or empties at 26 * a specified rate. 27 * 28 * The simulated buffer automatically fills or empties at a specific rate. 29 * 30 * An item is the thing contained in the simulated buffer. Items are moved 31 * in and out of the buffer without subdivision. 32 * 33 * An integral number of items must arrive / depart in each second. 34 * This number is stored in items_per_second_ 35 * 36 * items_per_second * 2000000000 must fit within an int64_t. This 37 * works if items_per_second is represented by an int32. 38 * 39 * The base class does have the concept of capacity, but doesn't use it. 40 * It is included here to simplify unit testing. 41 * 42 * For actual use, see SimulatedInputBuffer and SimulatedOutputBuffer below. 43 */ 44 class SimulatedBufferBase { 45 public: 46 static inline int64_t divide_and_round_up(int64_t q, int64_t d) { 47 return q / d + ((q % d) != 0); 48 } 49 50 SimulatedBufferBase( 51 int32_t items_per_second, 52 int64_t simulated_item_capacity, 53 cvd::time::MonotonicTimePointFactory* clock = 54 cvd::time::MonotonicTimePointFactory::GetInstance()) : 55 clock_(clock), 56 current_item_num_(0), 57 base_item_num_(0), 58 simulated_item_capacity_(simulated_item_capacity), 59 items_per_second_(items_per_second), 60 initialize_(true), 61 paused_(false) { } 62 63 virtual ~SimulatedBufferBase() { } 64 65 int64_t GetCurrentItemNum() { 66 Update(); 67 return current_item_num_; 68 } 69 70 const cvd::time::MonotonicTimePoint GetLastUpdatedTime() const { 71 return current_time_; 72 } 73 74 // Sleep for the given amount of time. Subclasses may override this to use 75 // different sleep calls. 76 // Sleep is best-effort. The code assumes that the acutal sleep time may be 77 // greater or less than the time requested. 78 virtual void SleepUntilTime(const cvd::time::MonotonicTimePoint& in) { 79 struct timespec ts; 80 in.ToTimespec(&ts); 81 clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &ts, NULL); 82 } 83 84 // The time counter may not start at 0. Concrete classes should call this 85 // to allow the buffer simulation to read the current time number and 86 // initialize its internal state. 87 virtual void Init() { 88 if (initialize_) { 89 clock_->FetchCurrentTime(&base_time_); 90 current_time_ = base_time_; 91 initialize_ = false; 92 } 93 } 94 95 virtual void Update() { 96 if (initialize_) { 97 Init(); 98 } 99 cvd::time::MonotonicTimePoint now; 100 clock_->FetchCurrentTime(&now); 101 // We can't call FetchCurrentTime() in the constuctor because a subclass may 102 // want to override it, so we initialze the times to 0. If we detect this 103 // case go ahead and initialize to a current timestamp. 104 if (paused_) { 105 base_time_ += now - current_time_; 106 current_time_ = now; 107 return; 108 } 109 // Avoid potential overflow by limiting the scaling to one time second. 110 // There is no round-off error here because the bases are adjusted for full 111 // seconds. 112 // There is no issue with int64 overflow because 2's compliment subtraction 113 // is immune to overflow. 114 // However, this does assume that kNanosecondsPerSecond * items_per_second_ 115 // fits in an int64. 116 cvd::time::Seconds seconds(now - base_time_); 117 base_time_ += seconds; 118 base_item_num_ += seconds.count() * items_per_second_; 119 current_time_ = now; 120 current_item_num_ = 121 cvd::time::Nanoseconds(now - base_time_).count() * 122 items_per_second_ / cvd::time::kNanosecondsPerSecond + 123 base_item_num_; 124 } 125 126 // If set to true new items will not be created. 127 bool SetPaused(bool new_state) { 128 bool rval = paused_; 129 Update(); 130 paused_ = new_state; 131 return rval; 132 } 133 134 // Calculate the TimePoint that corresponds to an item. 135 // Caution: This may not return a correct time for items in the past. 136 cvd::time::MonotonicTimePoint CalculateItemTime(int64_t item) { 137 int64_t seconds = (item - base_item_num_) / items_per_second_; 138 int64_t new_base_item_num = base_item_num_ + seconds * items_per_second_; 139 return base_time_ + cvd::time::Seconds(seconds) + 140 cvd::time::Nanoseconds(divide_and_round_up( 141 (item - new_base_item_num) * 142 cvd::time::kNanosecondsPerSecond, 143 items_per_second_)); 144 } 145 146 // Sleep until the given item number is generated. If the generator is 147 // paused unpause it to make the sleep finite. 148 void SleepUntilItem(int64_t item) { 149 if (paused_) { 150 SetPaused(false); 151 } 152 cvd::time::MonotonicTimePoint desired_time = 153 CalculateItemTime(item); 154 while (1) { 155 Update(); 156 if (current_item_num_ - item >= 0) { 157 return; 158 } 159 SleepUntilTime(desired_time); 160 } 161 } 162 163 protected: 164 // Source of the timepoints. 165 cvd::time::MonotonicTimePointFactory* clock_; 166 // Time when the other values in the structure were updated. 167 cvd::time::MonotonicTimePoint current_time_; 168 // Most recent time when there was no round-off error between the clock and 169 // items. 170 cvd::time::MonotonicTimePoint base_time_; 171 // Number of the current item. 172 int64_t current_item_num_; 173 // Most recent item number where there was no round-off error between the 174 // clock and items. 175 int64_t base_item_num_; 176 // Simulated_Item_Capacity of the buffer in items. 177 int64_t simulated_item_capacity_; 178 // Number of items that are created in 1s. A typical number would be 48000. 179 int32_t items_per_second_; 180 bool initialize_; 181 // If true then don't generate new items. 182 bool paused_; 183 }; 184 185 /** 186 * This is a simulation of an output buffer that drains at a constant rate. 187 */ 188 class SimulatedOutputBuffer : public SimulatedBufferBase { 189 public: 190 SimulatedOutputBuffer( 191 int64_t item_rate, 192 int64_t simulated_item_capacity, 193 cvd::time::MonotonicTimePointFactory* clock = 194 cvd::time::MonotonicTimePointFactory::GetInstance()) : 195 SimulatedBufferBase(item_rate, simulated_item_capacity, clock) { 196 output_buffer_item_num_ = current_item_num_; 197 } 198 199 void Update() override { 200 SimulatedBufferBase::Update(); 201 if ((output_buffer_item_num_ - current_item_num_) < 0) { 202 // We ran out of items at some point in the past. However, the 203 // output capactiy can't be negative. 204 output_buffer_item_num_ = current_item_num_; 205 } 206 } 207 208 int64_t AddToOutputBuffer(int64_t num_new_items, bool block) { 209 Update(); 210 // The easy case: num_new_items fit in the bucket. 211 if ((output_buffer_item_num_ + num_new_items - current_item_num_) <= 212 simulated_item_capacity_) { 213 output_buffer_item_num_ += num_new_items; 214 return num_new_items; 215 } 216 // If we're non-blocking accept enough items to fill the output. 217 if (!block) { 218 int64_t used = current_item_num_ + simulated_item_capacity_ - 219 output_buffer_item_num_; 220 output_buffer_item_num_ = current_item_num_ + simulated_item_capacity_; 221 return used; 222 } 223 int64_t new_output_buffer_item_num = output_buffer_item_num_ + num_new_items; 224 SleepUntilItem(new_output_buffer_item_num - simulated_item_capacity_); 225 output_buffer_item_num_ = new_output_buffer_item_num; 226 return num_new_items; 227 } 228 229 int64_t GetNextOutputBufferItemNum() { 230 Update(); 231 return output_buffer_item_num_; 232 } 233 234 cvd::time::MonotonicTimePoint GetNextOutputBufferItemTime() { 235 Update(); 236 return CalculateItemTime(output_buffer_item_num_); 237 } 238 239 int64_t GetOutputBufferSize() { 240 Update(); 241 return output_buffer_item_num_ - current_item_num_; 242 } 243 244 void Drain() { 245 SleepUntilItem(output_buffer_item_num_); 246 } 247 248 protected: 249 int64_t output_buffer_item_num_; 250 }; 251 252 /** 253 * Simulates an input buffer that fills at a constant rate. 254 */ 255 class SimulatedInputBuffer : public SimulatedBufferBase { 256 public: 257 SimulatedInputBuffer( 258 int64_t item_rate, 259 int64_t simulated_item_capacity, 260 cvd::time::MonotonicTimePointFactory* clock = 261 cvd::time::MonotonicTimePointFactory::GetInstance()) : 262 SimulatedBufferBase(item_rate, simulated_item_capacity, clock) { 263 input_buffer_item_num_ = current_item_num_; 264 lost_input_items_ = 0; 265 } 266 267 void Update() override { 268 SimulatedBufferBase::Update(); 269 if ((current_item_num_ - input_buffer_item_num_) > 270 simulated_item_capacity_) { 271 // The buffer overflowed at some point in the past. Account for the lost 272 // times. 273 int64_t new_input_buffer_item_num = 274 current_item_num_ - simulated_item_capacity_; 275 lost_input_items_ += 276 new_input_buffer_item_num - input_buffer_item_num_; 277 input_buffer_item_num_ = new_input_buffer_item_num; 278 } 279 } 280 281 int64_t RemoveFromInputBuffer(int64_t num_items_wanted, bool block) { 282 Update(); 283 if (!block) { 284 int64_t num_items_available = current_item_num_ - input_buffer_item_num_; 285 if (num_items_available < num_items_wanted) { 286 input_buffer_item_num_ += num_items_available; 287 return num_items_available; 288 } else { 289 input_buffer_item_num_ += num_items_wanted; 290 return num_items_wanted; 291 } 292 } 293 // Calculate the item number that is being claimed. Sleep until it appears. 294 // Advancing input_buffer_item_num_ causes a negative value to be compared 295 // to the capacity, effectively disabling the overflow detection code 296 // in Update(). 297 input_buffer_item_num_ += num_items_wanted; 298 while (input_buffer_item_num_ - current_item_num_ > 0) { 299 SleepUntilItem(input_buffer_item_num_); 300 } 301 return num_items_wanted; 302 } 303 304 int64_t GetLostInputItems() { 305 Update(); 306 int64_t rval = lost_input_items_; 307 lost_input_items_ = 0; 308 return rval; 309 } 310 311 protected: 312 int64_t input_buffer_item_num_; 313 int64_t lost_input_items_; 314 }; 315