1 /*
2  * Copyright (C) 2011 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5  * in compliance with the License. You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software distributed under the License
10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11  * or implied. See the License for the specific language governing permissions and limitations under
12  * the License.
13  */
14 
15 package com.google.common.collect;
16 
17 import com.google.common.annotations.Beta;
18 import com.google.common.base.Preconditions;
19 
20 import java.util.ArrayDeque;
21 import java.util.Collection;
22 import java.util.Deque;
23 import java.util.PriorityQueue;
24 import java.util.Queue;
25 import java.util.concurrent.ArrayBlockingQueue;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28 import java.util.concurrent.LinkedBlockingDeque;
29 import java.util.concurrent.LinkedBlockingQueue;
30 import java.util.concurrent.PriorityBlockingQueue;
31 import java.util.concurrent.SynchronousQueue;
32 import java.util.concurrent.TimeUnit;
33 
34 /**
35  * Static utility methods pertaining to {@link Queue} and {@link Deque} instances.
36  * Also see this class's counterparts {@link Lists}, {@link Sets}, and {@link Maps}.
37  *
38  * @author Kurt Alfred Kluever
39  * @since 11.0
40  */
41 public final class Queues {
Queues()42   private Queues() {}
43 
44   // ArrayBlockingQueue
45 
46   /**
47    * Creates an empty {@code ArrayBlockingQueue} with the given (fixed) capacity
48    * and nonfair access policy.
49    */
newArrayBlockingQueue(int capacity)50   public static <E> ArrayBlockingQueue<E> newArrayBlockingQueue(int capacity) {
51     return new ArrayBlockingQueue<E>(capacity);
52   }
53 
54   // ArrayDeque
55 
56   /**
57    * Creates an empty {@code ArrayDeque}.
58    *
59    * @since 12.0
60    */
newArrayDeque()61   public static <E> ArrayDeque<E> newArrayDeque() {
62     return new ArrayDeque<E>();
63   }
64 
65   /**
66    * Creates an {@code ArrayDeque} containing the elements of the specified iterable,
67    * in the order they are returned by the iterable's iterator.
68    *
69    * @since 12.0
70    */
newArrayDeque(Iterable<? extends E> elements)71   public static <E> ArrayDeque<E> newArrayDeque(Iterable<? extends E> elements) {
72     if (elements instanceof Collection) {
73       return new ArrayDeque<E>(Collections2.cast(elements));
74     }
75     ArrayDeque<E> deque = new ArrayDeque<E>();
76     Iterables.addAll(deque, elements);
77     return deque;
78   }
79 
80   // ConcurrentLinkedQueue
81 
82   /**
83    * Creates an empty {@code ConcurrentLinkedQueue}.
84    */
newConcurrentLinkedQueue()85   public static <E> ConcurrentLinkedQueue<E> newConcurrentLinkedQueue() {
86     return new ConcurrentLinkedQueue<E>();
87   }
88 
89   /**
90    * Creates a {@code ConcurrentLinkedQueue} containing the elements of the specified iterable,
91    * in the order they are returned by the iterable's iterator.
92    */
newConcurrentLinkedQueue( Iterable<? extends E> elements)93   public static <E> ConcurrentLinkedQueue<E> newConcurrentLinkedQueue(
94       Iterable<? extends E> elements) {
95     if (elements instanceof Collection) {
96       return new ConcurrentLinkedQueue<E>(Collections2.cast(elements));
97     }
98     ConcurrentLinkedQueue<E> queue = new ConcurrentLinkedQueue<E>();
99     Iterables.addAll(queue, elements);
100     return queue;
101   }
102 
103   // LinkedBlockingDeque
104 
105   /**
106    * Creates an empty {@code LinkedBlockingDeque} with a capacity of {@link Integer#MAX_VALUE}.
107    *
108    * @since 12.0
109    */
newLinkedBlockingDeque()110   public static <E> LinkedBlockingDeque<E> newLinkedBlockingDeque() {
111     return new LinkedBlockingDeque<E>();
112   }
113 
114   /**
115    * Creates an empty {@code LinkedBlockingDeque} with the given (fixed) capacity.
116    *
117    * @throws IllegalArgumentException if {@code capacity} is less than 1
118    * @since 12.0
119    */
newLinkedBlockingDeque(int capacity)120   public static <E> LinkedBlockingDeque<E> newLinkedBlockingDeque(int capacity) {
121     return new LinkedBlockingDeque<E>(capacity);
122   }
123 
124   /**
125    * Creates a {@code LinkedBlockingDeque} with a capacity of {@link Integer#MAX_VALUE},
126    * containing the elements of the specified iterable,
127    * in the order they are returned by the iterable's iterator.
128    *
129    * @since 12.0
130    */
newLinkedBlockingDeque(Iterable<? extends E> elements)131   public static <E> LinkedBlockingDeque<E> newLinkedBlockingDeque(Iterable<? extends E> elements) {
132     if (elements instanceof Collection) {
133       return new LinkedBlockingDeque<E>(Collections2.cast(elements));
134     }
135     LinkedBlockingDeque<E> deque = new LinkedBlockingDeque<E>();
136     Iterables.addAll(deque, elements);
137     return deque;
138   }
139 
140   // LinkedBlockingQueue
141 
142   /**
143    * Creates an empty {@code LinkedBlockingQueue} with a capacity of {@link Integer#MAX_VALUE}.
144    */
newLinkedBlockingQueue()145   public static <E> LinkedBlockingQueue<E> newLinkedBlockingQueue() {
146     return new LinkedBlockingQueue<E>();
147   }
148 
149   /**
150    * Creates an empty {@code LinkedBlockingQueue} with the given (fixed) capacity.
151    *
152    * @throws IllegalArgumentException if {@code capacity} is less than 1
153    */
newLinkedBlockingQueue(int capacity)154   public static <E> LinkedBlockingQueue<E> newLinkedBlockingQueue(int capacity) {
155     return new LinkedBlockingQueue<E>(capacity);
156   }
157 
158   /**
159    * Creates a {@code LinkedBlockingQueue} with a capacity of {@link Integer#MAX_VALUE},
160    * containing the elements of the specified iterable,
161    * in the order they are returned by the iterable's iterator.
162    *
163    * @param elements the elements that the queue should contain, in order
164    * @return a new {@code LinkedBlockingQueue} containing those elements
165    */
newLinkedBlockingQueue(Iterable<? extends E> elements)166   public static <E> LinkedBlockingQueue<E> newLinkedBlockingQueue(Iterable<? extends E> elements) {
167     if (elements instanceof Collection) {
168       return new LinkedBlockingQueue<E>(Collections2.cast(elements));
169     }
170     LinkedBlockingQueue<E> queue = new LinkedBlockingQueue<E>();
171     Iterables.addAll(queue, elements);
172     return queue;
173   }
174 
175   // LinkedList: see {@link com.google.common.collect.Lists}
176 
177   // PriorityBlockingQueue
178 
179   /**
180    * Creates an empty {@code PriorityBlockingQueue} with the ordering given by its
181    * elements' natural ordering.
182    *
183    * @since 11.0 (requires that {@code E} be {@code Comparable} since 15.0).
184    */
newPriorityBlockingQueue()185   public static <E extends Comparable> PriorityBlockingQueue<E> newPriorityBlockingQueue() {
186     return new PriorityBlockingQueue<E>();
187   }
188 
189   /**
190    * Creates a {@code PriorityBlockingQueue} containing the given elements.
191    *
192    * <b>Note:</b> If the specified iterable is a {@code SortedSet} or a {@code PriorityQueue},
193    * this priority queue will be ordered according to the same ordering.
194    *
195    * @since 11.0 (requires that {@code E} be {@code Comparable} since 15.0).
196    */
newPriorityBlockingQueue( Iterable<? extends E> elements)197   public static <E extends Comparable> PriorityBlockingQueue<E> newPriorityBlockingQueue(
198       Iterable<? extends E> elements) {
199     if (elements instanceof Collection) {
200       return new PriorityBlockingQueue<E>(Collections2.cast(elements));
201     }
202     PriorityBlockingQueue<E> queue = new PriorityBlockingQueue<E>();
203     Iterables.addAll(queue, elements);
204     return queue;
205   }
206 
207   // PriorityQueue
208 
209   /**
210    * Creates an empty {@code PriorityQueue} with the ordering given by its
211    * elements' natural ordering.
212    *
213    * @since 11.0 (requires that {@code E} be {@code Comparable} since 15.0).
214    */
newPriorityQueue()215   public static <E extends Comparable> PriorityQueue<E> newPriorityQueue() {
216     return new PriorityQueue<E>();
217   }
218 
219   /**
220    * Creates a {@code PriorityQueue} containing the given elements.
221    *
222    * <b>Note:</b> If the specified iterable is a {@code SortedSet} or a {@code PriorityQueue},
223    * this priority queue will be ordered according to the same ordering.
224    *
225    * @since 11.0 (requires that {@code E} be {@code Comparable} since 15.0).
226    */
newPriorityQueue( Iterable<? extends E> elements)227   public static <E extends Comparable> PriorityQueue<E> newPriorityQueue(
228       Iterable<? extends E> elements) {
229     if (elements instanceof Collection) {
230       return new PriorityQueue<E>(Collections2.cast(elements));
231     }
232     PriorityQueue<E> queue = new PriorityQueue<E>();
233     Iterables.addAll(queue, elements);
234     return queue;
235   }
236 
237   // SynchronousQueue
238 
239   /**
240    * Creates an empty {@code SynchronousQueue} with nonfair access policy.
241    */
newSynchronousQueue()242   public static <E> SynchronousQueue<E> newSynchronousQueue() {
243     return new SynchronousQueue<E>();
244   }
245 
246   /**
247    * Drains the queue as {@link BlockingQueue#drainTo(Collection, int)}, but if the requested
248    * {@code numElements} elements are not available, it will wait for them up to the specified
249    * timeout.
250    *
251    * @param q the blocking queue to be drained
252    * @param buffer where to add the transferred elements
253    * @param numElements the number of elements to be waited for
254    * @param timeout how long to wait before giving up, in units of {@code unit}
255    * @param unit a {@code TimeUnit} determining how to interpret the timeout parameter
256    * @return the number of elements transferred
257    * @throws InterruptedException if interrupted while waiting
258    */
259   @Beta
drain(BlockingQueue<E> q, Collection<? super E> buffer, int numElements, long timeout, TimeUnit unit)260   public static <E> int drain(BlockingQueue<E> q, Collection<? super E> buffer, int numElements,
261       long timeout, TimeUnit unit) throws InterruptedException {
262     Preconditions.checkNotNull(buffer);
263     /*
264      * This code performs one System.nanoTime() more than necessary, and in return, the time to
265      * execute Queue#drainTo is not added *on top* of waiting for the timeout (which could make
266      * the timeout arbitrarily inaccurate, given a queue that is slow to drain).
267      */
268     long deadline = System.nanoTime() + unit.toNanos(timeout);
269     int added = 0;
270     while (added < numElements) {
271       // we could rely solely on #poll, but #drainTo might be more efficient when there are multiple
272       // elements already available (e.g. LinkedBlockingQueue#drainTo locks only once)
273       added += q.drainTo(buffer, numElements - added);
274       if (added < numElements) { // not enough elements immediately available; will have to poll
275         E e = q.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS);
276         if (e == null) {
277           break; // we already waited enough, and there are no more elements in sight
278         }
279         buffer.add(e);
280         added++;
281       }
282     }
283     return added;
284   }
285 
286   /**
287    * Drains the queue as {@linkplain #drain(BlockingQueue, Collection, int, long, TimeUnit)},
288    * but with a different behavior in case it is interrupted while waiting. In that case, the
289    * operation will continue as usual, and in the end the thread's interruption status will be set
290    * (no {@code InterruptedException} is thrown).
291    *
292    * @param q the blocking queue to be drained
293    * @param buffer where to add the transferred elements
294    * @param numElements the number of elements to be waited for
295    * @param timeout how long to wait before giving up, in units of {@code unit}
296    * @param unit a {@code TimeUnit} determining how to interpret the timeout parameter
297    * @return the number of elements transferred
298    */
299   @Beta
drainUninterruptibly(BlockingQueue<E> q, Collection<? super E> buffer, int numElements, long timeout, TimeUnit unit)300   public static <E> int drainUninterruptibly(BlockingQueue<E> q, Collection<? super E> buffer,
301       int numElements, long timeout, TimeUnit unit) {
302     Preconditions.checkNotNull(buffer);
303     long deadline = System.nanoTime() + unit.toNanos(timeout);
304     int added = 0;
305     boolean interrupted = false;
306     try {
307       while (added < numElements) {
308         // we could rely solely on #poll, but #drainTo might be more efficient when there are
309         // multiple elements already available (e.g. LinkedBlockingQueue#drainTo locks only once)
310         added += q.drainTo(buffer, numElements - added);
311         if (added < numElements) { // not enough elements immediately available; will have to poll
312           E e; // written exactly once, by a successful (uninterrupted) invocation of #poll
313           while (true) {
314             try {
315               e = q.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS);
316               break;
317             } catch (InterruptedException ex) {
318               interrupted = true; // note interruption and retry
319             }
320           }
321           if (e == null) {
322             break; // we already waited enough, and there are no more elements in sight
323           }
324           buffer.add(e);
325           added++;
326         }
327       }
328     } finally {
329       if (interrupted) {
330         Thread.currentThread().interrupt();
331       }
332     }
333     return added;
334   }
335 
336   /**
337    * Returns a synchronized (thread-safe) queue backed by the specified queue. In order to
338    * guarantee serial access, it is critical that <b>all</b> access to the backing queue is
339    * accomplished through the returned queue.
340    *
341    * <p>It is imperative that the user manually synchronize on the returned queue when accessing
342    * the queue's iterator: <pre>   {@code
343    *
344    *   Queue<E> queue = Queues.synchronizedQueue(MinMaxPriorityQueue.<E>create());
345    *   ...
346    *   queue.add(element);  // Needn't be in synchronized block
347    *   ...
348    *   synchronized (queue) {  // Must synchronize on queue!
349    *     Iterator<E> i = queue.iterator(); // Must be in synchronized block
350    *     while (i.hasNext()) {
351    *       foo(i.next());
352    *     }
353    *   }}</pre>
354    *
355    * <p>Failure to follow this advice may result in non-deterministic behavior.
356    *
357    * <p>The returned queue will be serializable if the specified queue is serializable.
358    *
359    * @param queue the queue to be wrapped in a synchronized view
360    * @return a synchronized view of the specified queue
361    * @since 14.0
362    */
synchronizedQueue(Queue<E> queue)363   public static <E> Queue<E> synchronizedQueue(Queue<E> queue) {
364     return Synchronized.queue(queue, null);
365   }
366 
367   /**
368    * Returns a synchronized (thread-safe) deque backed by the specified deque. In order to
369    * guarantee serial access, it is critical that <b>all</b> access to the backing deque is
370    * accomplished through the returned deque.
371    *
372    * <p>It is imperative that the user manually synchronize on the returned deque when accessing
373    * any of the deque's iterators: <pre>   {@code
374    *
375    *   Deque<E> deque = Queues.synchronizedDeque(Queues.<E>newArrayDeque());
376    *   ...
377    *   deque.add(element);  // Needn't be in synchronized block
378    *   ...
379    *   synchronized (deque) {  // Must synchronize on deque!
380    *     Iterator<E> i = deque.iterator(); // Must be in synchronized block
381    *     while (i.hasNext()) {
382    *       foo(i.next());
383    *     }
384    *   }}</pre>
385    *
386    * <p>Failure to follow this advice may result in non-deterministic behavior.
387    *
388    * <p>The returned deque will be serializable if the specified deque is serializable.
389    *
390    * @param deque the deque to be wrapped in a synchronized view
391    * @return a synchronized view of the specified deque
392    * @since 15.0
393    */
synchronizedDeque(Deque<E> deque)394   public static <E> Deque<E> synchronizedDeque(Deque<E> deque) {
395     return Synchronized.deque(deque, null);
396   }
397 }
398