1 /*
2  * Copyright (C) 2011 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.volley;
18 
19 import android.os.Handler;
20 import android.os.Looper;
21 import androidx.annotation.IntDef;
22 import java.lang.annotation.Retention;
23 import java.lang.annotation.RetentionPolicy;
24 import java.util.ArrayList;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.Set;
28 import java.util.concurrent.PriorityBlockingQueue;
29 import java.util.concurrent.atomic.AtomicInteger;
30 
31 /**
32  * A request dispatch queue with a thread pool of dispatchers.
33  *
34  * <p>Calling {@link #add(Request)} will enqueue the given Request for dispatch, resolving from
35  * either cache or network on a worker thread, and then delivering a parsed response on the main
36  * thread.
37  */
38 public class RequestQueue {
39 
40     /** Callback interface for completed requests. */
41     // TODO: This should not be a generic class, because the request type can't be determined at
42     // compile time, so all calls to onRequestFinished are unsafe. However, changing this would be
43     // an API-breaking change. See also: https://github.com/google/volley/pull/109
44     @Deprecated // Use RequestEventListener instead.
45     public interface RequestFinishedListener<T> {
46         /** Called when a request has finished processing. */
onRequestFinished(Request<T> request)47         void onRequestFinished(Request<T> request);
48     }
49 
50     /** Request event types the listeners {@link RequestEventListener} will be notified about. */
51     @Retention(RetentionPolicy.SOURCE)
52     @IntDef({
53         RequestEvent.REQUEST_QUEUED,
54         RequestEvent.REQUEST_CACHE_LOOKUP_STARTED,
55         RequestEvent.REQUEST_CACHE_LOOKUP_FINISHED,
56         RequestEvent.REQUEST_NETWORK_DISPATCH_STARTED,
57         RequestEvent.REQUEST_NETWORK_DISPATCH_FINISHED,
58         RequestEvent.REQUEST_FINISHED
59     })
60     public @interface RequestEvent {
61         /** The request was added to the queue. */
62         public static final int REQUEST_QUEUED = 0;
63         /** Cache lookup started for the request. */
64         public static final int REQUEST_CACHE_LOOKUP_STARTED = 1;
65         /**
66          * Cache lookup finished for the request and cached response is delivered or request is
67          * queued for network dispatching.
68          */
69         public static final int REQUEST_CACHE_LOOKUP_FINISHED = 2;
70         /** Network dispatch started for the request. */
71         public static final int REQUEST_NETWORK_DISPATCH_STARTED = 3;
72         /** The network dispatch finished for the request and response (if any) is delivered. */
73         public static final int REQUEST_NETWORK_DISPATCH_FINISHED = 4;
74         /**
75          * All the work associated with the request is finished and request is removed from all the
76          * queues.
77          */
78         public static final int REQUEST_FINISHED = 5;
79     }
80 
81     /** Callback interface for request life cycle events. */
82     public interface RequestEventListener {
83         /**
84          * Called on every request lifecycle event. Can be called from different threads. The call
85          * is blocking request processing, so any processing should be kept at minimum or moved to
86          * another thread.
87          */
onRequestEvent(Request<?> request, @RequestEvent int event)88         void onRequestEvent(Request<?> request, @RequestEvent int event);
89     }
90 
91     /** Used for generating monotonically-increasing sequence numbers for requests. */
92     private final AtomicInteger mSequenceGenerator = new AtomicInteger();
93 
94     /**
95      * The set of all requests currently being processed by this RequestQueue. A Request will be in
96      * this set if it is waiting in any queue or currently being processed by any dispatcher.
97      */
98     private final Set<Request<?>> mCurrentRequests = new HashSet<>();
99 
100     /** The cache triage queue. */
101     private final PriorityBlockingQueue<Request<?>> mCacheQueue = new PriorityBlockingQueue<>();
102 
103     /** The queue of requests that are actually going out to the network. */
104     private final PriorityBlockingQueue<Request<?>> mNetworkQueue = new PriorityBlockingQueue<>();
105 
106     /** Number of network request dispatcher threads to start. */
107     private static final int DEFAULT_NETWORK_THREAD_POOL_SIZE = 4;
108 
109     /** Cache interface for retrieving and storing responses. */
110     private final Cache mCache;
111 
112     /** Network interface for performing requests. */
113     private final Network mNetwork;
114 
115     /** Response delivery mechanism. */
116     private final ResponseDelivery mDelivery;
117 
118     /** The network dispatchers. */
119     private final NetworkDispatcher[] mDispatchers;
120 
121     /** The cache dispatcher. */
122     private CacheDispatcher mCacheDispatcher;
123 
124     private final List<RequestFinishedListener> mFinishedListeners = new ArrayList<>();
125 
126     /** Collection of listeners for request life cycle events. */
127     private final List<RequestEventListener> mEventListeners = new ArrayList<>();
128 
129     /**
130      * Creates the worker pool. Processing will not begin until {@link #start()} is called.
131      *
132      * @param cache A Cache to use for persisting responses to disk
133      * @param network A Network interface for performing HTTP requests
134      * @param threadPoolSize Number of network dispatcher threads to create
135      * @param delivery A ResponseDelivery interface for posting responses and errors
136      */
RequestQueue( Cache cache, Network network, int threadPoolSize, ResponseDelivery delivery)137     public RequestQueue(
138             Cache cache, Network network, int threadPoolSize, ResponseDelivery delivery) {
139         mCache = cache;
140         mNetwork = network;
141         mDispatchers = new NetworkDispatcher[threadPoolSize];
142         mDelivery = delivery;
143     }
144 
145     /**
146      * Creates the worker pool. Processing will not begin until {@link #start()} is called.
147      *
148      * @param cache A Cache to use for persisting responses to disk
149      * @param network A Network interface for performing HTTP requests
150      * @param threadPoolSize Number of network dispatcher threads to create
151      */
RequestQueue(Cache cache, Network network, int threadPoolSize)152     public RequestQueue(Cache cache, Network network, int threadPoolSize) {
153         this(
154                 cache,
155                 network,
156                 threadPoolSize,
157                 new ExecutorDelivery(new Handler(Looper.getMainLooper())));
158     }
159 
160     /**
161      * Creates the worker pool. Processing will not begin until {@link #start()} is called.
162      *
163      * @param cache A Cache to use for persisting responses to disk
164      * @param network A Network interface for performing HTTP requests
165      */
RequestQueue(Cache cache, Network network)166     public RequestQueue(Cache cache, Network network) {
167         this(cache, network, DEFAULT_NETWORK_THREAD_POOL_SIZE);
168     }
169 
170     /** Starts the dispatchers in this queue. */
start()171     public void start() {
172         stop(); // Make sure any currently running dispatchers are stopped.
173         // Create the cache dispatcher and start it.
174         mCacheDispatcher = new CacheDispatcher(mCacheQueue, mNetworkQueue, mCache, mDelivery);
175         mCacheDispatcher.start();
176 
177         // Create network dispatchers (and corresponding threads) up to the pool size.
178         for (int i = 0; i < mDispatchers.length; i++) {
179             NetworkDispatcher networkDispatcher =
180                     new NetworkDispatcher(mNetworkQueue, mNetwork, mCache, mDelivery);
181             mDispatchers[i] = networkDispatcher;
182             networkDispatcher.start();
183         }
184     }
185 
186     /** Stops the cache and network dispatchers. */
stop()187     public void stop() {
188         if (mCacheDispatcher != null) {
189             mCacheDispatcher.quit();
190         }
191         for (final NetworkDispatcher mDispatcher : mDispatchers) {
192             if (mDispatcher != null) {
193                 mDispatcher.quit();
194             }
195         }
196     }
197 
198     /** Gets a sequence number. */
getSequenceNumber()199     public int getSequenceNumber() {
200         return mSequenceGenerator.incrementAndGet();
201     }
202 
203     /** Gets the {@link Cache} instance being used. */
getCache()204     public Cache getCache() {
205         return mCache;
206     }
207 
208     /**
209      * A simple predicate or filter interface for Requests, for use by {@link
210      * RequestQueue#cancelAll(RequestFilter)}.
211      */
212     public interface RequestFilter {
apply(Request<?> request)213         boolean apply(Request<?> request);
214     }
215 
216     /**
217      * Cancels all requests in this queue for which the given filter applies.
218      *
219      * @param filter The filtering function to use
220      */
cancelAll(RequestFilter filter)221     public void cancelAll(RequestFilter filter) {
222         synchronized (mCurrentRequests) {
223             for (Request<?> request : mCurrentRequests) {
224                 if (filter.apply(request)) {
225                     request.cancel();
226                 }
227             }
228         }
229     }
230 
231     /**
232      * Cancels all requests in this queue with the given tag. Tag must be non-null and equality is
233      * by identity.
234      */
cancelAll(final Object tag)235     public void cancelAll(final Object tag) {
236         if (tag == null) {
237             throw new IllegalArgumentException("Cannot cancelAll with a null tag");
238         }
239         cancelAll(
240                 new RequestFilter() {
241                     @Override
242                     public boolean apply(Request<?> request) {
243                         return request.getTag() == tag;
244                     }
245                 });
246     }
247 
248     /**
249      * Adds a Request to the dispatch queue.
250      *
251      * @param request The request to service
252      * @return The passed-in request
253      */
add(Request<T> request)254     public <T> Request<T> add(Request<T> request) {
255         // Tag the request as belonging to this queue and add it to the set of current requests.
256         request.setRequestQueue(this);
257         synchronized (mCurrentRequests) {
258             mCurrentRequests.add(request);
259         }
260 
261         // Process requests in the order they are added.
262         request.setSequence(getSequenceNumber());
263         request.addMarker("add-to-queue");
264         sendRequestEvent(request, RequestEvent.REQUEST_QUEUED);
265 
266         beginRequest(request);
267         return request;
268     }
269 
beginRequest(Request<T> request)270     <T> void beginRequest(Request<T> request) {
271         // If the request is uncacheable, skip the cache queue and go straight to the network.
272         if (!request.shouldCache()) {
273             sendRequestOverNetwork(request);
274         } else {
275             mCacheQueue.add(request);
276         }
277     }
278 
279     /**
280      * Called from {@link Request#finish(String)}, indicating that processing of the given request
281      * has finished.
282      */
283     @SuppressWarnings("unchecked") // see above note on RequestFinishedListener
finish(Request<T> request)284     <T> void finish(Request<T> request) {
285         // Remove from the set of requests currently being processed.
286         synchronized (mCurrentRequests) {
287             mCurrentRequests.remove(request);
288         }
289         synchronized (mFinishedListeners) {
290             for (RequestFinishedListener<T> listener : mFinishedListeners) {
291                 listener.onRequestFinished(request);
292             }
293         }
294         sendRequestEvent(request, RequestEvent.REQUEST_FINISHED);
295     }
296 
297     /** Sends a request life cycle event to the listeners. */
sendRequestEvent(Request<?> request, @RequestEvent int event)298     void sendRequestEvent(Request<?> request, @RequestEvent int event) {
299         synchronized (mEventListeners) {
300             for (RequestEventListener listener : mEventListeners) {
301                 listener.onRequestEvent(request, event);
302             }
303         }
304     }
305 
306     /** Add a listener for request life cycle events. */
addRequestEventListener(RequestEventListener listener)307     public void addRequestEventListener(RequestEventListener listener) {
308         synchronized (mEventListeners) {
309             mEventListeners.add(listener);
310         }
311     }
312 
313     /** Remove a listener for request life cycle events. */
removeRequestEventListener(RequestEventListener listener)314     public void removeRequestEventListener(RequestEventListener listener) {
315         synchronized (mEventListeners) {
316             mEventListeners.remove(listener);
317         }
318     }
319 
320     @Deprecated // Use RequestEventListener instead.
addRequestFinishedListener(RequestFinishedListener<T> listener)321     public <T> void addRequestFinishedListener(RequestFinishedListener<T> listener) {
322         synchronized (mFinishedListeners) {
323             mFinishedListeners.add(listener);
324         }
325     }
326 
327     /** Remove a RequestFinishedListener. Has no effect if listener was not previously added. */
328     @Deprecated // Use RequestEventListener instead.
removeRequestFinishedListener(RequestFinishedListener<T> listener)329     public <T> void removeRequestFinishedListener(RequestFinishedListener<T> listener) {
330         synchronized (mFinishedListeners) {
331             mFinishedListeners.remove(listener);
332         }
333     }
334 
getResponseDelivery()335     public ResponseDelivery getResponseDelivery() {
336         return mDelivery;
337     }
338 
sendRequestOverNetwork(Request<T> request)339     <T> void sendRequestOverNetwork(Request<T> request) {
340         mNetworkQueue.add(request);
341     }
342 }
343