1 /* 2 * Copyright (C) 2015 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.camera.one.v2.sharedimagereader.ticketpool; 18 19 import com.android.camera.async.ConcurrentState; 20 import com.android.camera.async.Observable; 21 import com.android.camera.async.SafeCloseable; 22 23 import java.util.ArrayDeque; 24 import java.util.ArrayList; 25 import java.util.Collection; 26 import java.util.List; 27 import java.util.concurrent.atomic.AtomicBoolean; 28 import java.util.concurrent.locks.Condition; 29 import java.util.concurrent.locks.ReentrantLock; 30 31 import javax.annotation.CheckReturnValue; 32 import javax.annotation.Nonnull; 33 import javax.annotation.Nullable; 34 import javax.annotation.ParametersAreNonnullByDefault; 35 import javax.annotation.concurrent.GuardedBy; 36 37 /** 38 * A TicketPool which reserves a capacity of tickets ahead of time from its 39 * parent. 40 * <p> 41 * The capacity of the pool is the total number of tickets which may be 42 * simultaneously acquired and not closed from this pool. 43 * <p> 44 * Increases in capacity result in tickets being requested from the parent pool. 45 * <p> 46 * Decreases in capacity result in the returning of tickets to the parent pool 47 * as soon as possible, which may depend on consumers of this ticket pool 48 * closing tickets which had previously been acquired. 49 */ 50 @ParametersAreNonnullByDefault 51 public class ReservableTicketPool implements TicketPool, SafeCloseable { 52 private static class Waiter { 53 private final Condition mDoneCondition; 54 private final int mRequestedTicketCount; 55 Waiter(Condition doneCondition, int requestedTicketCount)56 private Waiter(Condition doneCondition, int requestedTicketCount) { 57 mDoneCondition = doneCondition; 58 mRequestedTicketCount = requestedTicketCount; 59 } 60 getDoneCondition()61 public Condition getDoneCondition() { 62 return mDoneCondition; 63 } 64 getRequestedTicketCount()65 public int getRequestedTicketCount() { 66 return mRequestedTicketCount; 67 } 68 } 69 70 /** 71 * Wraps tickets from the parent ticket pool with logic to either release 72 * them back to the parent pool, or hold on to them, depending on the 73 * current capacity. 74 */ 75 private class TicketImpl implements Ticket { 76 private final Ticket mParentTicket; 77 private final AtomicBoolean mClosed; 78 TicketImpl(Ticket parentTicket)79 private TicketImpl(Ticket parentTicket) { 80 mParentTicket = parentTicket; 81 mClosed = new AtomicBoolean(false); 82 } 83 84 @Override close()85 public void close() { 86 if (mClosed.getAndSet(true)) { 87 return; 88 } 89 boolean releaseToParent; 90 mLock.lock(); 91 try { 92 // If mParentTickets is already at capacity, then we "overflow" 93 // and return the ticket to the parent by closing it. 94 // Otherwise, add it back to the local pool (mParentTickets) and 95 // update any waiters which may want it. 96 releaseToParent = (mParentTickets.size() == mCapacity); 97 if (!releaseToParent) { 98 mParentTickets.add(mParentTicket); 99 updateCurrentTicketCount(); 100 releaseWaitersOnTicketAvailability(); 101 } 102 } finally { 103 mLock.unlock(); 104 } 105 106 if (releaseToParent) { 107 mParentTicket.close(); 108 } 109 } 110 } 111 112 /** 113 * The pool from which capacity is acquired and released. When capacity is 114 * acquired, tickets are taken from the parent pool and stored in 115 * {@link #mParentTickets}. 116 */ 117 private final TicketPool mParentPool; 118 /** 119 * Lock for mutable state: {@link #mParentTickets}, {@link #mCapacity}, and 120 * {@link #mTicketWaiters}. 121 */ 122 private final ReentrantLock mLock; 123 /** 124 * A Queue containing the number of tickets requested by each thread 125 * currently blocked in {@link #acquire}. 126 */ 127 @GuardedBy("mLock") 128 private final ArrayDeque<Waiter> mTicketWaiters; 129 /** 130 * Tickets from mParentPool which have not been given to clients via 131 * {@link #acquire}. 132 */ 133 @GuardedBy("mLock") 134 private final ArrayDeque<Ticket> mParentTickets; 135 /** 136 * Maintains an observable count of the number of tickets which are readily 137 * available at any time. 138 */ 139 private final ConcurrentState<Integer> mAvailableTicketCount; 140 /** 141 * The total number of tickets available and outstanding (acquired but not 142 * closed). 143 */ 144 @GuardedBy("mLock") 145 private int mCapacity; 146 ReservableTicketPool(TicketPool parentPool)147 public ReservableTicketPool(TicketPool parentPool) { 148 mParentPool = parentPool; 149 mLock = new ReentrantLock(true); 150 mTicketWaiters = new ArrayDeque<>(); 151 mParentTickets = new ArrayDeque<>(); 152 mCapacity = 0; 153 mAvailableTicketCount = new ConcurrentState<>(0); 154 } 155 156 @GuardedBy("mLock") updateCurrentTicketCount()157 private void updateCurrentTicketCount() { 158 mLock.lock(); 159 try { 160 if (mTicketWaiters.size() != 0) { 161 mAvailableTicketCount.update(0); 162 } else { 163 mAvailableTicketCount.update(mParentTickets.size()); 164 } 165 } finally { 166 mLock.unlock(); 167 } 168 } 169 170 @Nonnull 171 @Override acquire(int tickets)172 public Collection<Ticket> acquire(int tickets) throws InterruptedException, 173 NoCapacityAvailableException { 174 Collection<Ticket> acquiredParentTickets = acquireParentTickets(tickets); 175 176 List<Ticket> wrappedTicketList = new ArrayList<>(); 177 for (Ticket parentTicket : acquiredParentTickets) { 178 wrappedTicketList.add(new TicketImpl(parentTicket)); 179 } 180 return wrappedTicketList; 181 } 182 183 @Nonnull 184 @Override getAvailableTicketCount()185 public Observable<Integer> getAvailableTicketCount() { 186 return mAvailableTicketCount; 187 } 188 189 @Override tryAcquire()190 public Ticket tryAcquire() { 191 Ticket parentTicket; 192 mLock.lock(); 193 try { 194 if (mParentTickets.isEmpty() || mTicketWaiters.size() > 0) { 195 return null; 196 } 197 parentTicket = mParentTickets.remove(); 198 updateCurrentTicketCount(); 199 } finally { 200 mLock.unlock(); 201 } 202 203 return new TicketImpl(parentTicket); 204 } 205 206 /** 207 * Reserves tickets from the parent pool. 208 * 209 * @param additionalCapacity The additional capacity to acquire. 210 * @throws InterruptedException If interrupted while trying to acquire the 211 * necessary number of tickets. 212 */ reserveCapacity(int additionalCapacity)213 public void reserveCapacity(int additionalCapacity) throws InterruptedException, 214 NoCapacityAvailableException { 215 Collection<Ticket> tickets = mParentPool.acquire(additionalCapacity); 216 217 mLock.lock(); 218 try { 219 mCapacity += additionalCapacity; 220 221 for (Ticket ticket : tickets) { 222 mParentTickets.add(ticket); 223 } 224 225 releaseWaitersOnTicketAvailability(); 226 } finally { 227 mLock.unlock(); 228 } 229 230 updateCurrentTicketCount(); 231 } 232 233 /** 234 * Releases the capacity to the parent ticket pool. Note that the tickets 235 * will be released as soon as possible. However, this is not necessarily 236 * immediately if there are tickets which have been acquired() by a user of 237 * this class, but not yet released. 238 * 239 * @param capacityToRelease The amount of capacity to release. 240 */ releaseCapacity(int capacityToRelease)241 public void releaseCapacity(int capacityToRelease) { 242 if (capacityToRelease <= 0) { 243 return; 244 } 245 List<Ticket> parentTicketsToRelease = new ArrayList<>(); 246 247 mLock.lock(); 248 try { 249 if (capacityToRelease > mCapacity) { 250 capacityToRelease = mCapacity; 251 } 252 253 mCapacity -= capacityToRelease; 254 255 // Release as many tickets as necessary, immediately. 256 int numParentTicketsToRelease = Math.min(mParentTickets.size(), capacityToRelease); 257 for (int i = 0; i < numParentTicketsToRelease; i++) { 258 parentTicketsToRelease.add(mParentTickets.remove()); 259 } 260 261 abortWaitersOnCapacityDecrease(); 262 } finally { 263 mLock.unlock(); 264 } 265 266 for (Ticket ticket : parentTicketsToRelease) { 267 ticket.close(); 268 } 269 270 updateCurrentTicketCount(); 271 } 272 273 /** 274 * Releases all remaining capacity to the parent ticket pool. 275 */ releaseAllCapacity()276 private void releaseAllCapacity() { 277 mLock.lock(); 278 try { 279 releaseCapacity(mCapacity); 280 } finally { 281 mLock.unlock(); 282 } 283 } 284 285 @Override close()286 public void close() { 287 releaseAllCapacity(); 288 } 289 290 /** 291 * Acquires the specified number of tickets from mParentTickets atomically, 292 * blocking until released by {@link #releaseWaitersOnTicketAvailability}. 293 * 294 * @param tickets The number of tickets to acquire. 295 */ acquireParentTickets(int tickets)296 private Collection<Ticket> acquireParentTickets(int tickets) throws InterruptedException, 297 NoCapacityAvailableException { 298 // The list of tickets from mTicketList to acquire. 299 // Try to acquire these immediately, if there are no threads already 300 // waiting for tickets. 301 List<Ticket> acquiredParentTickets = null; 302 mLock.lock(); 303 try { 304 if (mTicketWaiters.isEmpty()) { 305 acquiredParentTickets = tryAcquireAtomically(tickets); 306 } 307 Waiter thisWaiter = new Waiter(mLock.newCondition(), tickets); 308 mTicketWaiters.add(thisWaiter); 309 updateCurrentTicketCount(); 310 try { 311 while (acquiredParentTickets == null) { 312 thisWaiter.getDoneCondition().await(); 313 acquiredParentTickets = tryAcquireAtomically(tickets); 314 } 315 } finally { 316 mTicketWaiters.remove(thisWaiter); 317 } 318 updateCurrentTicketCount(); 319 } finally { 320 mLock.unlock(); 321 } 322 return acquiredParentTickets; 323 } 324 325 /** 326 * Atomically attempt to remove the necessary number of tickets. This must 327 * be an all-or-nothing attempt to avoid multiple acquire() calls from 328 * deadlocking by each partially acquiring the necessary number of tickets. 329 * 330 * @return The tickets acquired from the parent ticket pool, or null if they 331 * could not be acquired. 332 */ 333 @Nullable 334 @CheckReturnValue tryAcquireAtomically(int tickets)335 private List<Ticket> tryAcquireAtomically(int tickets) throws NoCapacityAvailableException { 336 List<Ticket> acquiredParentTickets = new ArrayList<>(); 337 mLock.lock(); 338 try { 339 if (tickets > mCapacity) { 340 throw new NoCapacityAvailableException(); 341 } 342 if (mParentTickets.size() >= tickets) { 343 for (int i = 0; i < tickets; i++) { 344 acquiredParentTickets.add(mParentTickets.remove()); 345 } 346 updateCurrentTicketCount(); 347 return acquiredParentTickets; 348 } 349 } finally { 350 mLock.unlock(); 351 } 352 return null; 353 } 354 releaseWaitersOnTicketAvailability()355 private void releaseWaitersOnTicketAvailability() { 356 mLock.lock(); 357 try { 358 // Release waiters, in order, so long as their requests can be 359 // fulfilled. 360 int numTicketsReadilyAvailable = mParentTickets.size(); 361 while (mTicketWaiters.size() > 0) { 362 Waiter nextWaiter = mTicketWaiters.peek(); 363 if (nextWaiter.getRequestedTicketCount() <= numTicketsReadilyAvailable) { 364 numTicketsReadilyAvailable -= nextWaiter.getRequestedTicketCount(); 365 nextWaiter.getDoneCondition().signal(); 366 mTicketWaiters.poll(); 367 } else { 368 return; 369 } 370 } 371 } finally { 372 mLock.unlock(); 373 } 374 } 375 abortWaitersOnCapacityDecrease()376 private void abortWaitersOnCapacityDecrease() { 377 mLock.lock(); 378 try { 379 // Release all waiters requesting more tickets than the current 380 // capacity 381 List<Waiter> toRemove = new ArrayList<>(); 382 for (Waiter waiter : mTicketWaiters) { 383 if (waiter.getRequestedTicketCount() > mCapacity) { 384 toRemove.add(waiter); 385 } 386 } 387 for (Waiter waiter : toRemove) { 388 waiter.getDoneCondition().signal(); 389 } 390 } finally { 391 mLock.unlock(); 392 } 393 } 394 } 395