1 /*
2  * Copyright (C) 2014 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.camera.util;
18 
19 import android.os.Handler;
20 import android.util.Pair;
21 
22 import com.android.camera.debug.Log.Tag;
23 
24 import java.security.InvalidParameterException;
25 import java.util.ArrayList;
26 import java.util.Collections;
27 import java.util.Map;
28 import java.util.TreeMap;
29 import java.util.concurrent.Semaphore;
30 
31 /**
32  * Implements a thread-safe fixed-size pool map of integers to objects such that
33  * the least element may be swapped out for a new element at any time. Elements
34  * may be temporarily "pinned" for processing in separate threads, during which
35  * they will not be swapped out. <br>
36  * This class enforces the invariant that a new element can always be swapped
37  * in. Thus, requests to pin an element for a particular task may be denied if
38  * there are not enough unpinned elements which can be removed. <br>
39  */
40 public class ConcurrentSharedRingBuffer<E> {
41     private static final Tag TAG = new Tag("CncrrntShrdRingBuf");
42 
43     /**
44      * Callback interface for swapping elements at the head of the buffer.
45      */
46     public static interface SwapTask<E> {
47         /**
48          * Called if the buffer is under-capacity and a new element is being
49          * added.
50          *
51          * @return the new element to add.
52          */
create()53         public E create();
54 
55         /**
56          * Called if the buffer is full and an old element must be swapped out
57          * to make room for the new element.
58          *
59          * @param oldElement the element being removed from the buffer.
60          * @return the new element to add.
61          */
swap(E oldElement)62         public E swap(E oldElement);
63 
64         /**
65          * Called if the buffer already has an element with the specified key.
66          * Note that the element may currently be pinned for processing by other
67          * elements. Therefore, implementations must be thread safe with respect
68          * to any other operations which may be applied to pinned tasks.
69          *
70          * @param existingElement the element to be updated.
71          */
update(E existingElement)72         public void update(E existingElement);
73 
74         /**
75          * Returns the key of the element that the ring buffer should prefer
76          * when considering a swapping candidate. If the returned key is not an
77          * unpinned element then ring buffer will replace the element with least
78          * key.
79          *
80          * @return a key of an existing unpinned element or a negative value.
81          */
getSwapKey()82         public long getSwapKey();
83     }
84 
85     /**
86      * Callback for selecting an element to pin. See
87      * {@link tryPinGreatestSelected}.
88      */
89     public static interface Selector<E> {
90         /**
91          * @param element The element to select or not select.
92          * @return true if the element should be selected, false otherwise.
93          */
select(E element)94         public boolean select(E element);
95     }
96 
97     public static interface PinStateListener {
98         /**
99          * Invoked whenever the ability to pin an element for processing
100          * changes.
101          *
102          * @param pinsAvailable If true, requests to pin elements (e.g. calls to
103          *            pinGreatest()) are less-likely to fail. If false, they are
104          *            more-likely to fail.
105          */
onPinStateChange(boolean pinsAvailable)106         public void onPinStateChange(boolean pinsAvailable);
107     }
108 
109     /**
110      * Wraps E with reference counting.
111      */
112     private static class Pinnable<E> {
113         private E mElement;
114 
115         /** Reference-counting for the number of tasks holding this element. */
116         private int mPins;
117 
Pinnable(E element)118         public Pinnable(E element) {
119             mElement = element;
120             mPins = 0;
121         }
122 
getElement()123         public E getElement() {
124             return mElement;
125         }
126 
isPinned()127         private boolean isPinned() {
128             return mPins > 0;
129         }
130     }
131 
132     /**
133      * A Semaphore that allows to reduce permits to negative values.
134      */
135     private static class NegativePermitsSemaphore extends Semaphore {
NegativePermitsSemaphore(int permits)136         public NegativePermitsSemaphore(int permits) {
137             super(permits);
138         }
139 
140         /**
141          * Reduces the number of permits by <code>permits</code>.
142          * <p/>
143          * This method can only be called when number of available permits is
144          * zero.
145          */
146         @Override
reducePermits(int permits)147         public void reducePermits(int permits) {
148             if (availablePermits() != 0) {
149                 throw new IllegalStateException("Called without draining the semaphore.");
150             }
151             super.reducePermits(permits);
152         }
153     }
154 
155     /** Allow only one swapping operation at a time. */
156     private final Object mSwapLock = new Object();
157     /**
158      * Lock all transactions involving mElements, mUnpinnedElements,
159      * mCapacitySemaphore, mPinSemaphore, mClosed, mPinStateHandler, and
160      * mPinStateListener and the state of Pinnable instances. <br>
161      * TODO Replace this with a priority semaphore and allow swapLeast()
162      * operations to run faster at the expense of slower tryPin()/release()
163      * calls.
164      */
165     private final Object mLock = new Object();
166     /** Stores all elements. */
167     private TreeMap<Long, Pinnable<E>> mElements;
168     /** Stores the subset of mElements which is not pinned. */
169     private TreeMap<Long, Pinnable<E>> mUnpinnedElements;
170     /** Used to acquire space in mElements. */
171     private final Semaphore mCapacitySemaphore;
172     /** This must be acquired while an element is pinned. */
173     private final NegativePermitsSemaphore mPinSemaphore;
174     private boolean mClosed = false;
175 
176     private Handler mPinStateHandler = null;
177     private PinStateListener mPinStateListener = null;
178 
179     /**
180      * Constructs a new ring buffer with the specified capacity.
181      *
182      * @param capacity the maximum number of elements to store.
183      */
ConcurrentSharedRingBuffer(int capacity)184     public ConcurrentSharedRingBuffer(int capacity) {
185         if (capacity <= 0) {
186             throw new IllegalArgumentException("Capacity must be positive.");
187         }
188 
189         mElements = new TreeMap<Long, Pinnable<E>>();
190         mUnpinnedElements = new TreeMap<Long, Pinnable<E>>();
191         mCapacitySemaphore = new Semaphore(capacity);
192         // Start with -1 permits to pin elements since we must always have at
193         // least one unpinned
194         // element available to swap out as the head of the buffer.
195         mPinSemaphore = new NegativePermitsSemaphore(-1);
196     }
197 
198     /**
199      * Sets or replaces the listener.
200      *
201      * @param handler The handler on which to invoke the listener.
202      * @param listener The listener to be called whenever the ability to pin an
203      *            element changes.
204      */
setListener(Handler handler, PinStateListener listener)205     public void setListener(Handler handler, PinStateListener listener) {
206         synchronized (mLock) {
207             mPinStateHandler = handler;
208             mPinStateListener = listener;
209         }
210     }
211 
212     /**
213      * Places a new element in the ring buffer, removing the least (by key)
214      * non-pinned element if necessary. The existing element (or {@code null} if
215      * the buffer is under-capacity) is passed to {@code swapper.swap()} and the
216      * result is saved to the buffer. If an entry with {@code newKey} already
217      * exists in the ring-buffer, then {@code swapper.update()} is called and
218      * may modify the element in-place. See {@link SwapTask}. <br>
219      * Note that this method is the only way to add new elements to the buffer
220      * and will never be blocked on pinned tasks.
221      *
222      * @param newKey the key with which to store the swapped-in element.
223      * @param swapper the callback used to perform the swap.
224      * @return true if the swap was successful and the new element was saved to
225      *         the buffer, false if the swap was not possible and the element
226      *         was not saved to the buffer. Note that if the swap failed,
227      *         {@code swapper.create()} may or may not have been invoked.
228      */
swapLeast(long newKey, SwapTask<E> swapper)229     public boolean swapLeast(long newKey, SwapTask<E> swapper) {
230         synchronized (mSwapLock) {
231             Pinnable<E> existingElement = null;
232 
233             synchronized (mLock) {
234                 if (mClosed) {
235                     return false;
236                 }
237                 existingElement = mElements.get(newKey);
238             }
239 
240             if (existingElement != null) {
241                 swapper.update(existingElement.getElement());
242                 return true;
243             }
244 
245             if (mCapacitySemaphore.tryAcquire()) {
246                 // If we are under capacity, insert the new element and return.
247                 Pinnable<E> p = new Pinnable<E>(swapper.create());
248 
249                 synchronized (mLock) {
250                     if (mClosed) {
251                         return false;
252                     }
253 
254                     // Add the new element and release another permit to pin
255                     // allow pinning another element.
256                     mElements.put(newKey, p);
257                     mUnpinnedElements.put(newKey, p);
258                     mPinSemaphore.release();
259                     if (mPinSemaphore.availablePermits() == 1) {
260                         notifyPinStateChange(true);
261                     }
262                 }
263 
264                 return true;
265             } else {
266                 Pinnable<E> toSwap;
267 
268                 // Note that this method must be synchronized to avoid
269                 // attempting to remove more than one unpinned element at a
270                 // time.
271                 synchronized (mLock) {
272                     if (mClosed) {
273                         return false;
274                     }
275                     Pair<Long, Pinnable<E>> toSwapEntry = null;
276                     long swapKey = swapper.getSwapKey();
277                     // If swapKey is same as the inserted key return early.
278                     if (swapKey == newKey) {
279                         return false;
280                     }
281 
282                     if (mUnpinnedElements.containsKey(swapKey)) {
283                         toSwapEntry = Pair.create(swapKey, mUnpinnedElements.remove(swapKey));
284                     } else {
285                         // The returned key from getSwapKey was not found in the
286                         // unpinned elements use the least entry from the
287                         // unpinned elements.
288                         Map.Entry<Long, Pinnable<E>> swapEntry = mUnpinnedElements.pollFirstEntry();
289                         if (swapEntry != null) {
290                             toSwapEntry = Pair.create(swapEntry.getKey(), swapEntry.getValue());
291                         }
292                     }
293 
294                     if (toSwapEntry == null) {
295                         // We can get here if no unpinned element was found.
296                         return false;
297                     }
298 
299                     toSwap = toSwapEntry.second;
300 
301                     // We must remove the element from both mElements and
302                     // mUnpinnedElements because it must be re-added after the
303                     // swap to be placed in the correct order with newKey.
304                     mElements.remove(toSwapEntry.first);
305                 }
306 
307                 try {
308                     toSwap.mElement = swapper.swap(toSwap.mElement);
309                 } finally {
310                     synchronized (mLock) {
311                         if (mClosed) {
312                             return false;
313                         }
314 
315                         mElements.put(newKey, toSwap);
316                         mUnpinnedElements.put(newKey, toSwap);
317                     }
318                 }
319                 return true;
320             }
321         }
322     }
323 
324     /**
325      * Attempts to pin the element with the given key and return it. <br>
326      * Note that, if a non-null pair is returned, the caller <em>must</em> call
327      * {@link #release} with the key.
328      *
329      * @return the key and object of the pinned element, if one could be pinned,
330      *         or null.
331      */
tryPin(long key)332     public Pair<Long, E> tryPin(long key) {
333 
334         boolean acquiredLastPin = false;
335         Pinnable<E> entry = null;
336 
337         synchronized (mLock) {
338             if (mClosed) {
339                 return null;
340             }
341 
342             if (mElements.isEmpty()) {
343                 return null;
344             }
345 
346             entry = mElements.get(key);
347 
348             if (entry == null) {
349                 return null;
350             }
351 
352             if (entry.isPinned()) {
353                 // If the element is already pinned by another task, simply
354                 // increment the pin count.
355                 entry.mPins++;
356             } else {
357                 // We must ensure that there will still be an unpinned element
358                 // after we pin this one.
359                 if (mPinSemaphore.tryAcquire()) {
360                     mUnpinnedElements.remove(key);
361                     entry.mPins++;
362 
363                     acquiredLastPin = mPinSemaphore.availablePermits() <= 0;
364                 } else {
365                     return null;
366                 }
367             }
368         }
369 
370         // If we just grabbed the last permit, we must notify listeners of the
371         // pin
372         // state change.
373         if (acquiredLastPin) {
374             notifyPinStateChange(false);
375         }
376 
377         return Pair.create(key, entry.getElement());
378     }
379 
release(long key)380     public void release(long key) {
381         synchronized (mLock) {
382             // Note that this must proceed even if the buffer has been closed.
383 
384             Pinnable<E> element = mElements.get(key);
385 
386             if (element == null) {
387                 throw new InvalidParameterException(
388                         "No entry found for the given key: " + key + ".");
389             }
390 
391             if (!element.isPinned()) {
392                 throw new IllegalArgumentException("Calling release() with unpinned element.");
393             }
394 
395             // Unpin the element
396             element.mPins--;
397 
398             if (!element.isPinned()) {
399                 // If there are now 0 tasks pinning this element...
400                 mUnpinnedElements.put(key, element);
401 
402                 // Allow pinning another element.
403                 mPinSemaphore.release();
404 
405                 if (mPinSemaphore.availablePermits() == 1) {
406                     notifyPinStateChange(true);
407                 }
408             }
409         }
410     }
411 
412     /**
413      * Attempts to pin the greatest element and return it. <br>
414      * Note that, if a non-null element is returned, the caller <em>must</em>
415      * call {@link #release} with the element. Furthermore, behavior is
416      * undefined if the element's {@code compareTo} behavior changes between
417      * these calls.
418      *
419      * @return the key and object of the pinned element, if one could be pinned,
420      *         or null.
421      */
tryPinGreatest()422     public Pair<Long, E> tryPinGreatest() {
423         synchronized (mLock) {
424             if (mClosed) {
425                 return null;
426             }
427 
428             if (mElements.isEmpty()) {
429                 return null;
430             }
431 
432             return tryPin(mElements.lastKey());
433         }
434     }
435 
436     /**
437      * Attempts to pin the greatest element for which {@code selector} returns
438      * true. <br>
439      *
440      * @see #pinGreatest
441      */
tryPinGreatestSelected(Selector<E> selector)442     public Pair<Long, E> tryPinGreatestSelected(Selector<E> selector) {
443         // (Quickly) get the list of elements to search through.
444         ArrayList<Long> keys = new ArrayList<Long>();
445         synchronized (mLock) {
446             if (mClosed) {
447                 return null;
448             }
449 
450             if (mElements.isEmpty()) {
451                 return null;
452             }
453 
454             keys.addAll(mElements.keySet());
455         }
456 
457         Collections.sort(keys);
458 
459         // Pin each element, from greatest key to least, until we find the one
460         // we want (the element with the greatest key for which
461         // selector.selected() returns true).
462         for (int i = keys.size() - 1; i >= 0; i--) {
463             Pair<Long, E> pinnedCandidate = tryPin(keys.get(i));
464             if (pinnedCandidate != null) {
465                 boolean selected = false;
466 
467                 try {
468                     selected = selector.select(pinnedCandidate.second);
469                 } finally {
470                     // Don't leak pinnedCandidate if the above select() threw an
471                     // exception.
472                     if (selected) {
473                         return pinnedCandidate;
474                     } else {
475                         release(pinnedCandidate.first);
476                     }
477                 }
478             }
479         }
480 
481         return null;
482     }
483 
484     /**
485      * Removes all elements from the buffer, running {@code task} on each one,
486      * and waiting, if necessary, for all pins to be released.
487      *
488      * @param task
489      * @throws InterruptedException
490      */
close(Task<E> task)491     public void close(Task<E> task) throws InterruptedException {
492         int numPinnedElements;
493 
494         // Ensure that any pending swap tasks complete before closing.
495         synchronized (mSwapLock) {
496             synchronized (mLock) {
497                 mClosed = true;
498                 numPinnedElements = mElements.size() - mUnpinnedElements.size();
499             }
500         }
501 
502         notifyPinStateChange(false);
503 
504         // Wait for all pinned tasks to complete.
505         if (numPinnedElements > 0) {
506             mPinSemaphore.acquire(numPinnedElements);
507         }
508 
509         for (Pinnable<E> element : mElements.values()) {
510             task.run(element.mElement);
511             // Release the capacity permits.
512             mCapacitySemaphore.release();
513         }
514 
515         mUnpinnedElements.clear();
516 
517         mElements.clear();
518     }
519 
520     /**
521      * Attempts to get a pinned element for the given key.
522      *
523      * @param key the key of the pinned element.
524      * @return (key, value) pair if found otherwise null.
525      */
tryGetPinned(long key)526     public Pair<Long, E> tryGetPinned(long key) {
527         synchronized (mLock) {
528             if (mClosed) {
529                 return null;
530             }
531             for (java.util.Map.Entry<Long, Pinnable<E>> element : mElements.entrySet()) {
532                 if (element.getKey() == key) {
533                     if (element.getValue().isPinned()) {
534                         return Pair.create(element.getKey(), element.getValue().getElement());
535                     } else {
536                         return null;
537                     }
538                 }
539             }
540         }
541         return null;
542     }
543 
544     /**
545      * Reopens previously closed buffer.
546      * <p/>
547      * Buffer should be closed before calling this method. If called with an
548      * open buffer an {@link IllegalStateException} is thrown.
549      *
550      * @param unpinnedReservedSlotCount a non-negative integer for number of
551      *            slots to reserve for unpinned elements. These slots can never
552      *            be pinned and will always be available for swapping.
553      * @throws InterruptedException
554      */
reopenBuffer(int unpinnedReservedSlotCount)555     public void reopenBuffer(int unpinnedReservedSlotCount)
556             throws InterruptedException {
557         if (unpinnedReservedSlotCount < 0
558                 || unpinnedReservedSlotCount >= mCapacitySemaphore.availablePermits()) {
559             throw new IllegalArgumentException("Invalid unpinned reserved slot count: " +
560                     unpinnedReservedSlotCount);
561         }
562 
563         // Ensure that any pending swap tasks complete before closing.
564         synchronized (mSwapLock) {
565             synchronized (mLock) {
566                 if (!mClosed) {
567                     throw new IllegalStateException(
568                             "Attempt to reopen the buffer when it is not closed.");
569                 }
570 
571                 mPinSemaphore.drainPermits();
572                 mPinSemaphore.reducePermits(unpinnedReservedSlotCount);
573                 mClosed = false;
574             }
575         }
576     }
577 
578     /**
579      * Releases a pinned element for the given key.
580      * <p/>
581      * If element is unpinned, it is not released.
582      *
583      * @param key the key of the element, if the element is not present an
584      *            {@link IllegalArgumentException} is thrown.
585      */
releaseIfPinned(long key)586     public void releaseIfPinned(long key) {
587         synchronized (mLock) {
588             Pinnable<E> element = mElements.get(key);
589 
590             if (element == null) {
591                 throw new IllegalArgumentException("Invalid key." + key);
592             }
593 
594             if (element.isPinned()) {
595                 release(key);
596             }
597         }
598     }
599 
600     /**
601      * Releases all pinned elements in the buffer.
602      * <p/>
603      * Note: it only calls {@link #release(long)} only once on a pinned element.
604      */
releaseAll()605     public void releaseAll() {
606         synchronized (mSwapLock) {
607             synchronized (mLock) {
608                 if (mClosed || mElements.isEmpty()
609                         || mElements.size() == mUnpinnedElements.size()) {
610                     return;
611                 }
612                 for (java.util.Map.Entry<Long, Pinnable<E>> entry : mElements.entrySet()) {
613                     if (entry.getValue().isPinned()) {
614                         release(entry.getKey());
615                     }
616                 }
617             }
618         }
619     }
620 
notifyPinStateChange(final boolean pinsAvailable)621     private void notifyPinStateChange(final boolean pinsAvailable) {
622         synchronized (mLock) {
623             // We must synchronize on mPinStateHandler and mPinStateListener.
624             if (mPinStateHandler != null) {
625                 final PinStateListener listener = mPinStateListener;
626                 mPinStateHandler.post(new Runnable() {
627                         @Override
628                     public void run() {
629                         listener.onPinStateChange(pinsAvailable);
630                     }
631                 });
632             }
633         }
634     }
635 }
636