1 /* GENERATED SOURCE. DO NOT MODIFY. */
2 /*
3  * Copyright (C) 2013 Square, Inc.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package com.android.okhttp;
18 
19 import com.android.okhttp.Call.AsyncCall;
20 import com.android.okhttp.internal.Util;
21 import com.android.okhttp.internal.http.HttpEngine;
22 import java.util.ArrayDeque;
23 import java.util.Deque;
24 import java.util.Iterator;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.SynchronousQueue;
27 import java.util.concurrent.ThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
29 
30 /**
31  * Policy on when async requests are executed.
32  *
33  * <p>Each dispatcher uses an {@link ExecutorService} to run calls internally. If you
34  * supply your own executor, it should be able to run {@linkplain #getMaxRequests the
35  * configured maximum} number of calls concurrently.
36  * @hide This class is not part of the Android public SDK API
37  */
38 public final class Dispatcher {
39   private int maxRequests = 64;
40   private int maxRequestsPerHost = 5;
41 
42   /** Executes calls. Created lazily. */
43   private ExecutorService executorService;
44 
45   /** Ready calls in the order they'll be run. */
46   private final Deque<AsyncCall> readyCalls = new ArrayDeque<>();
47 
48   /** Running calls. Includes canceled calls that haven't finished yet. */
49   private final Deque<AsyncCall> runningCalls = new ArrayDeque<>();
50 
51   /** In-flight synchronous calls. Includes canceled calls that haven't finished yet. */
52   private final Deque<Call> executedCalls = new ArrayDeque<>();
53 
Dispatcher(ExecutorService executorService)54   public Dispatcher(ExecutorService executorService) {
55     this.executorService = executorService;
56   }
57 
Dispatcher()58   public Dispatcher() {
59   }
60 
getExecutorService()61   public synchronized ExecutorService getExecutorService() {
62     if (executorService == null) {
63       executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
64           new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
65     }
66     return executorService;
67   }
68 
69   /**
70    * Set the maximum number of requests to execute concurrently. Above this
71    * requests queue in memory, waiting for the running calls to complete.
72    *
73    * <p>If more than {@code maxRequests} requests are in flight when this is
74    * invoked, those requests will remain in flight.
75    */
setMaxRequests(int maxRequests)76   public synchronized void setMaxRequests(int maxRequests) {
77     if (maxRequests < 1) {
78       throw new IllegalArgumentException("max < 1: " + maxRequests);
79     }
80     this.maxRequests = maxRequests;
81     promoteCalls();
82   }
83 
getMaxRequests()84   public synchronized int getMaxRequests() {
85     return maxRequests;
86   }
87 
88   /**
89    * Set the maximum number of requests for each host to execute concurrently.
90    * This limits requests by the URL's host name. Note that concurrent requests
91    * to a single IP address may still exceed this limit: multiple hostnames may
92    * share an IP address or be routed through the same HTTP proxy.
93    *
94    * <p>If more than {@code maxRequestsPerHost} requests are in flight when this
95    * is invoked, those requests will remain in flight.
96    */
setMaxRequestsPerHost(int maxRequestsPerHost)97   public synchronized void setMaxRequestsPerHost(int maxRequestsPerHost) {
98     if (maxRequestsPerHost < 1) {
99       throw new IllegalArgumentException("max < 1: " + maxRequestsPerHost);
100     }
101     this.maxRequestsPerHost = maxRequestsPerHost;
102     promoteCalls();
103   }
104 
getMaxRequestsPerHost()105   public synchronized int getMaxRequestsPerHost() {
106     return maxRequestsPerHost;
107   }
108 
enqueue(AsyncCall call)109   synchronized void enqueue(AsyncCall call) {
110     if (runningCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
111       runningCalls.add(call);
112       getExecutorService().execute(call);
113     } else {
114       readyCalls.add(call);
115     }
116   }
117 
118   /** Cancel all calls with the tag {@code tag}. */
cancel(Object tag)119   public synchronized void cancel(Object tag) {
120     for (AsyncCall call : readyCalls) {
121       if (Util.equal(tag, call.tag())) {
122         call.cancel();
123       }
124     }
125 
126     for (AsyncCall call : runningCalls) {
127       if (Util.equal(tag, call.tag())) {
128         call.get().canceled = true;
129         HttpEngine engine = call.get().engine;
130         if (engine != null) engine.cancel();
131       }
132     }
133 
134     for (Call call : executedCalls) {
135       if (Util.equal(tag, call.tag())) {
136         call.cancel();
137       }
138     }
139   }
140 
141   /** Used by {@code AsyncCall#run} to signal completion. */
finished(AsyncCall call)142   synchronized void finished(AsyncCall call) {
143     if (!runningCalls.remove(call)) throw new AssertionError("AsyncCall wasn't running!");
144     promoteCalls();
145   }
146 
promoteCalls()147   private void promoteCalls() {
148     if (runningCalls.size() >= maxRequests) return; // Already running max capacity.
149     if (readyCalls.isEmpty()) return; // No ready calls to promote.
150 
151     for (Iterator<AsyncCall> i = readyCalls.iterator(); i.hasNext(); ) {
152       AsyncCall call = i.next();
153 
154       if (runningCallsForHost(call) < maxRequestsPerHost) {
155         i.remove();
156         runningCalls.add(call);
157         getExecutorService().execute(call);
158       }
159 
160       if (runningCalls.size() >= maxRequests) return; // Reached max capacity.
161     }
162   }
163 
164   /** Returns the number of running calls that share a host with {@code call}. */
runningCallsForHost(AsyncCall call)165   private int runningCallsForHost(AsyncCall call) {
166     int result = 0;
167     for (AsyncCall c : runningCalls) {
168       if (c.host().equals(call.host())) result++;
169     }
170     return result;
171   }
172 
173   /** Used by {@code Call#execute} to signal it is in-flight. */
executed(Call call)174   synchronized void executed(Call call) {
175     executedCalls.add(call);
176   }
177 
178   /** Used by {@code Call#execute} to signal completion. */
finished(Call call)179   synchronized void finished(Call call) {
180     if (!executedCalls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
181   }
182 
getRunningCallCount()183   public synchronized int getRunningCallCount() {
184     return runningCalls.size();
185   }
186 
getQueuedCallCount()187   public synchronized int getQueuedCallCount() {
188     return readyCalls.size();
189   }
190 }
191