1 /*
2  * Copyright (C) 2015 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.camera.one.v2.sharedimagereader.ticketpool;
18 
19 import com.android.camera.async.ConcurrentState;
20 import com.android.camera.async.Observable;
21 import com.android.camera.async.SafeCloseable;
22 
23 import java.util.ArrayDeque;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.List;
27 import java.util.concurrent.atomic.AtomicBoolean;
28 import java.util.concurrent.locks.Condition;
29 import java.util.concurrent.locks.ReentrantLock;
30 
31 import javax.annotation.CheckReturnValue;
32 import javax.annotation.Nonnull;
33 import javax.annotation.Nullable;
34 import javax.annotation.ParametersAreNonnullByDefault;
35 import javax.annotation.concurrent.GuardedBy;
36 
37 /**
38  * A TicketPool which reserves a capacity of tickets ahead of time from its
39  * parent.
40  * <p>
41  * The capacity of the pool is the total number of tickets which may be
42  * simultaneously acquired and not closed from this pool.
43  * <p>
44  * Increases in capacity result in tickets being requested from the parent pool.
45  * <p>
46  * Decreases in capacity result in the returning of tickets to the parent pool
47  * as soon as possible, which may depend on consumers of this ticket pool
48  * closing tickets which had previously been acquired.
49  */
50 @ParametersAreNonnullByDefault
51 public class ReservableTicketPool implements TicketPool, SafeCloseable {
52     private static class Waiter {
53         private final Condition mDoneCondition;
54         private final int mRequestedTicketCount;
55 
Waiter(Condition doneCondition, int requestedTicketCount)56         private Waiter(Condition doneCondition, int requestedTicketCount) {
57             mDoneCondition = doneCondition;
58             mRequestedTicketCount = requestedTicketCount;
59         }
60 
getDoneCondition()61         public Condition getDoneCondition() {
62             return mDoneCondition;
63         }
64 
getRequestedTicketCount()65         public int getRequestedTicketCount() {
66             return mRequestedTicketCount;
67         }
68     }
69 
70     /**
71      * Wraps tickets from the parent ticket pool with logic to either release
72      * them back to the parent pool, or hold on to them, depending on the
73      * current capacity.
74      */
75     private class TicketImpl implements Ticket {
76         private final Ticket mParentTicket;
77         private final AtomicBoolean mClosed;
78 
TicketImpl(Ticket parentTicket)79         private TicketImpl(Ticket parentTicket) {
80             mParentTicket = parentTicket;
81             mClosed = new AtomicBoolean(false);
82         }
83 
84         @Override
close()85         public void close() {
86             if (mClosed.getAndSet(true)) {
87                 return;
88             }
89             boolean releaseToParent;
90             mLock.lock();
91             try {
92                 // If mParentTickets is already at capacity, then we "overflow"
93                 // and return the ticket to the parent by closing it.
94                 // Otherwise, add it back to the local pool (mParentTickets) and
95                 // update any waiters which may want it.
96                 releaseToParent = (mParentTickets.size() == mCapacity);
97                 if (!releaseToParent) {
98                     mParentTickets.add(mParentTicket);
99                     updateCurrentTicketCount();
100                     releaseWaitersOnTicketAvailability();
101                 }
102             } finally {
103                 mLock.unlock();
104             }
105 
106             if (releaseToParent) {
107                 mParentTicket.close();
108             }
109         }
110     }
111 
112     /**
113      * The pool from which capacity is acquired and released. When capacity is
114      * acquired, tickets are taken from the parent pool and stored in
115      * {@link #mParentTickets}.
116      */
117     private final TicketPool mParentPool;
118     /**
119      * Lock for mutable state: {@link #mParentTickets}, {@link #mCapacity}, and
120      * {@link #mTicketWaiters}.
121      */
122     private final ReentrantLock mLock;
123     /**
124      * A Queue containing the number of tickets requested by each thread
125      * currently blocked in {@link #acquire}.
126      */
127     @GuardedBy("mLock")
128     private final ArrayDeque<Waiter> mTicketWaiters;
129     /**
130      * Tickets from mParentPool which have not been given to clients via
131      * {@link #acquire}.
132      */
133     @GuardedBy("mLock")
134     private final ArrayDeque<Ticket> mParentTickets;
135     /**
136      * Maintains an observable count of the number of tickets which are readily
137      * available at any time.
138      */
139     private final ConcurrentState<Integer> mAvailableTicketCount;
140     /**
141      * The total number of tickets available and outstanding (acquired but not
142      * closed).
143      */
144     @GuardedBy("mLock")
145     private int mCapacity;
146 
ReservableTicketPool(TicketPool parentPool)147     public ReservableTicketPool(TicketPool parentPool) {
148         mParentPool = parentPool;
149         mLock = new ReentrantLock(true);
150         mTicketWaiters = new ArrayDeque<>();
151         mParentTickets = new ArrayDeque<>();
152         mCapacity = 0;
153         mAvailableTicketCount = new ConcurrentState<>(0);
154     }
155 
156     @GuardedBy("mLock")
updateCurrentTicketCount()157     private void updateCurrentTicketCount() {
158         mLock.lock();
159         try {
160             if (mTicketWaiters.size() != 0) {
161                 mAvailableTicketCount.update(0);
162             } else {
163                 mAvailableTicketCount.update(mParentTickets.size());
164             }
165         } finally {
166             mLock.unlock();
167         }
168     }
169 
170     @Nonnull
171     @Override
acquire(int tickets)172     public Collection<Ticket> acquire(int tickets) throws InterruptedException,
173             NoCapacityAvailableException {
174         Collection<Ticket> acquiredParentTickets = acquireParentTickets(tickets);
175 
176         List<Ticket> wrappedTicketList = new ArrayList<>();
177         for (Ticket parentTicket : acquiredParentTickets) {
178             wrappedTicketList.add(new TicketImpl(parentTicket));
179         }
180         return wrappedTicketList;
181     }
182 
183     @Nonnull
184     @Override
getAvailableTicketCount()185     public Observable<Integer> getAvailableTicketCount() {
186         return mAvailableTicketCount;
187     }
188 
189     @Override
tryAcquire()190     public Ticket tryAcquire() {
191         Ticket parentTicket;
192         mLock.lock();
193         try {
194             if (mParentTickets.isEmpty() || mTicketWaiters.size() > 0) {
195                 return null;
196             }
197             parentTicket = mParentTickets.remove();
198             updateCurrentTicketCount();
199         } finally {
200             mLock.unlock();
201         }
202 
203         return new TicketImpl(parentTicket);
204     }
205 
206     /**
207      * Reserves tickets from the parent pool.
208      *
209      * @param additionalCapacity The additional capacity to acquire.
210      * @throws InterruptedException If interrupted while trying to acquire the
211      *             necessary number of tickets.
212      */
reserveCapacity(int additionalCapacity)213     public void reserveCapacity(int additionalCapacity) throws InterruptedException,
214             NoCapacityAvailableException {
215         Collection<Ticket> tickets = mParentPool.acquire(additionalCapacity);
216 
217         mLock.lock();
218         try {
219             mCapacity += additionalCapacity;
220 
221             for (Ticket ticket : tickets) {
222                 mParentTickets.add(ticket);
223             }
224 
225             releaseWaitersOnTicketAvailability();
226         } finally {
227             mLock.unlock();
228         }
229 
230         updateCurrentTicketCount();
231     }
232 
233     /**
234      * Releases the capacity to the parent ticket pool. Note that the tickets
235      * will be released as soon as possible. However, this is not necessarily
236      * immediately if there are tickets which have been acquired() by a user of
237      * this class, but not yet released.
238      *
239      * @param capacityToRelease The amount of capacity to release.
240      */
releaseCapacity(int capacityToRelease)241     public void releaseCapacity(int capacityToRelease) {
242         if (capacityToRelease <= 0) {
243             return;
244         }
245         List<Ticket> parentTicketsToRelease = new ArrayList<>();
246 
247         mLock.lock();
248         try {
249             if (capacityToRelease > mCapacity) {
250                 capacityToRelease = mCapacity;
251             }
252 
253             mCapacity -= capacityToRelease;
254 
255             // Release as many tickets as necessary, immediately.
256             int numParentTicketsToRelease = Math.min(mParentTickets.size(), capacityToRelease);
257             for (int i = 0; i < numParentTicketsToRelease; i++) {
258                 parentTicketsToRelease.add(mParentTickets.remove());
259             }
260 
261             abortWaitersOnCapacityDecrease();
262         } finally {
263             mLock.unlock();
264         }
265 
266         for (Ticket ticket : parentTicketsToRelease) {
267             ticket.close();
268         }
269 
270         updateCurrentTicketCount();
271     }
272 
273     /**
274      * Releases all remaining capacity to the parent ticket pool.
275      */
releaseAllCapacity()276     private void releaseAllCapacity() {
277         mLock.lock();
278         try {
279             releaseCapacity(mCapacity);
280         } finally {
281             mLock.unlock();
282         }
283     }
284 
285     @Override
close()286     public void close() {
287         releaseAllCapacity();
288     }
289 
290     /**
291      * Acquires the specified number of tickets from mParentTickets atomically,
292      * blocking until released by {@link #releaseWaitersOnTicketAvailability}.
293      *
294      * @param tickets The number of tickets to acquire.
295      */
acquireParentTickets(int tickets)296     private Collection<Ticket> acquireParentTickets(int tickets) throws InterruptedException,
297             NoCapacityAvailableException {
298         // The list of tickets from mTicketList to acquire.
299         // Try to acquire these immediately, if there are no threads already
300         // waiting for tickets.
301         List<Ticket> acquiredParentTickets = null;
302         mLock.lock();
303         try {
304             if (mTicketWaiters.isEmpty()) {
305                 acquiredParentTickets = tryAcquireAtomically(tickets);
306             }
307             Waiter thisWaiter = new Waiter(mLock.newCondition(), tickets);
308             mTicketWaiters.add(thisWaiter);
309             updateCurrentTicketCount();
310             try {
311                 while (acquiredParentTickets == null) {
312                     thisWaiter.getDoneCondition().await();
313                     acquiredParentTickets = tryAcquireAtomically(tickets);
314                 }
315             } finally {
316                 mTicketWaiters.remove(thisWaiter);
317             }
318             updateCurrentTicketCount();
319         } finally {
320             mLock.unlock();
321         }
322         return acquiredParentTickets;
323     }
324 
325     /**
326      * Atomically attempt to remove the necessary number of tickets. This must
327      * be an all-or-nothing attempt to avoid multiple acquire() calls from
328      * deadlocking by each partially acquiring the necessary number of tickets.
329      *
330      * @return The tickets acquired from the parent ticket pool, or null if they
331      *         could not be acquired.
332      */
333     @Nullable
334     @CheckReturnValue
tryAcquireAtomically(int tickets)335     private List<Ticket> tryAcquireAtomically(int tickets) throws NoCapacityAvailableException {
336         List<Ticket> acquiredParentTickets = new ArrayList<>();
337         mLock.lock();
338         try {
339             if (tickets > mCapacity) {
340                 throw new NoCapacityAvailableException();
341             }
342             if (mParentTickets.size() >= tickets) {
343                 for (int i = 0; i < tickets; i++) {
344                     acquiredParentTickets.add(mParentTickets.remove());
345                 }
346                 updateCurrentTicketCount();
347                 return acquiredParentTickets;
348             }
349         } finally {
350             mLock.unlock();
351         }
352         return null;
353     }
354 
releaseWaitersOnTicketAvailability()355     private void releaseWaitersOnTicketAvailability() {
356         mLock.lock();
357         try {
358             // Release waiters, in order, so long as their requests can be
359             // fulfilled.
360             int numTicketsReadilyAvailable = mParentTickets.size();
361             while (mTicketWaiters.size() > 0) {
362                 Waiter nextWaiter = mTicketWaiters.peek();
363                 if (nextWaiter.getRequestedTicketCount() <= numTicketsReadilyAvailable) {
364                     numTicketsReadilyAvailable -= nextWaiter.getRequestedTicketCount();
365                     nextWaiter.getDoneCondition().signal();
366                     mTicketWaiters.poll();
367                 } else {
368                     return;
369                 }
370             }
371         } finally {
372             mLock.unlock();
373         }
374     }
375 
abortWaitersOnCapacityDecrease()376     private void abortWaitersOnCapacityDecrease() {
377         mLock.lock();
378         try {
379             // Release all waiters requesting more tickets than the current
380             // capacity
381             List<Waiter> toRemove = new ArrayList<>();
382             for (Waiter waiter : mTicketWaiters) {
383                 if (waiter.getRequestedTicketCount() > mCapacity) {
384                     toRemove.add(waiter);
385                 }
386             }
387             for (Waiter waiter : toRemove) {
388                 waiter.getDoneCondition().signal();
389             }
390         } finally {
391             mLock.unlock();
392         }
393     }
394 }
395