1 /* 2 * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Spliterator; 28 import java.util.concurrent.CountedCompleter; 29 import java.util.concurrent.ForkJoinPool; 30 import java.util.concurrent.ForkJoinWorkerThread; 31 32 /** 33 * Abstract base class for most fork-join tasks used to implement stream ops. 34 * Manages splitting logic, tracking of child tasks, and intermediate results. 35 * Each task is associated with a {@link Spliterator} that describes the portion 36 * of the input associated with the subtree rooted at this task. 37 * Tasks may be leaf nodes (which will traverse the elements of 38 * the {@code Spliterator}) or internal nodes (which split the 39 * {@code Spliterator} into multiple child tasks). 40 * 41 * @implNote 42 * <p>This class is based on {@link CountedCompleter}, a form of fork-join task 43 * where each task has a semaphore-like count of uncompleted children, and the 44 * task is implicitly completed and notified when its last child completes. 45 * Internal node tasks will likely override the {@code onCompletion} method from 46 * {@code CountedCompleter} to merge the results from child tasks into the 47 * current task's result. 48 * 49 * <p>Splitting and setting up the child task links is done by {@code compute()} 50 * for internal nodes. At {@code compute()} time for leaf nodes, it is 51 * guaranteed that the parent's child-related fields (including sibling links 52 * for the parent's children) will be set up for all children. 53 * 54 * <p>For example, a task that performs a reduce would override {@code doLeaf()} 55 * to perform a reduction on that leaf node's chunk using the 56 * {@code Spliterator}, and override {@code onCompletion()} to merge the results 57 * of the child tasks for internal nodes: 58 * 59 * <pre>{@code 60 * protected S doLeaf() { 61 * spliterator.forEach(...); 62 * return localReductionResult; 63 * } 64 * 65 * public void onCompletion(CountedCompleter caller) { 66 * if (!isLeaf()) { 67 * ReduceTask<P_IN, P_OUT, T, R> child = children; 68 * R result = child.getLocalResult(); 69 * child = child.nextSibling; 70 * for (; child != null; child = child.nextSibling) 71 * result = combine(result, child.getLocalResult()); 72 * setLocalResult(result); 73 * } 74 * } 75 * }</pre> 76 * 77 * <p>Serialization is not supported as there is no intention to serialize 78 * tasks managed by stream ops. 79 * 80 * @param <P_IN> Type of elements input to the pipeline 81 * @param <P_OUT> Type of elements output from the pipeline 82 * @param <R> Type of intermediate result, which may be different from operation 83 * result type 84 * @param <K> Type of parent, child and sibling tasks 85 * @since 1.8 86 */ 87 @SuppressWarnings("serial") 88 abstract class AbstractTask<P_IN, P_OUT, R, 89 K extends AbstractTask<P_IN, P_OUT, R, K>> 90 extends CountedCompleter<R> { 91 92 private static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2; 93 94 /** The pipeline helper, common to all tasks in a computation */ 95 protected final PipelineHelper<P_OUT> helper; 96 97 /** 98 * The spliterator for the portion of the input associated with the subtree 99 * rooted at this task 100 */ 101 protected Spliterator<P_IN> spliterator; 102 103 /** Target leaf size, common to all tasks in a computation */ 104 protected long targetSize; // may be lazily initialized 105 106 /** 107 * The left child. 108 * null if no children 109 * if non-null rightChild is non-null 110 */ 111 protected K leftChild; 112 113 /** 114 * The right child. 115 * null if no children 116 * if non-null leftChild is non-null 117 */ 118 protected K rightChild; 119 120 /** The result of this node, if completed */ 121 private R localResult; 122 123 /** 124 * Constructor for root nodes. 125 * 126 * @param helper The {@code PipelineHelper} describing the stream pipeline 127 * up to this operation 128 * @param spliterator The {@code Spliterator} describing the source for this 129 * pipeline 130 */ AbstractTask(PipelineHelper<P_OUT> helper, Spliterator<P_IN> spliterator)131 protected AbstractTask(PipelineHelper<P_OUT> helper, 132 Spliterator<P_IN> spliterator) { 133 super(null); 134 this.helper = helper; 135 this.spliterator = spliterator; 136 this.targetSize = 0L; 137 } 138 139 /** 140 * Constructor for non-root nodes. 141 * 142 * @param parent this node's parent task 143 * @param spliterator {@code Spliterator} describing the subtree rooted at 144 * this node, obtained by splitting the parent {@code Spliterator} 145 */ AbstractTask(K parent, Spliterator<P_IN> spliterator)146 protected AbstractTask(K parent, 147 Spliterator<P_IN> spliterator) { 148 super(parent); 149 this.spliterator = spliterator; 150 this.helper = parent.helper; 151 this.targetSize = parent.targetSize; 152 } 153 154 /** 155 * Default target of leaf tasks for parallel decomposition. 156 * To allow load balancing, we over-partition, currently to approximately 157 * four tasks per processor, which enables others to help out 158 * if leaf tasks are uneven or some processors are otherwise busy. 159 */ getLeafTarget()160 public static int getLeafTarget() { 161 Thread t = Thread.currentThread(); 162 if (t instanceof ForkJoinWorkerThread) { 163 return ((ForkJoinWorkerThread) t).getPool().getParallelism() << 2; 164 } 165 else { 166 return LEAF_TARGET; 167 } 168 } 169 170 /** 171 * Constructs a new node of type T whose parent is the receiver; must call 172 * the AbstractTask(T, Spliterator) constructor with the receiver and the 173 * provided Spliterator. 174 * 175 * @param spliterator {@code Spliterator} describing the subtree rooted at 176 * this node, obtained by splitting the parent {@code Spliterator} 177 * @return newly constructed child node 178 */ makeChild(Spliterator<P_IN> spliterator)179 protected abstract K makeChild(Spliterator<P_IN> spliterator); 180 181 /** 182 * Computes the result associated with a leaf node. Will be called by 183 * {@code compute()} and the result passed to @{code setLocalResult()} 184 * 185 * @return the computed result of a leaf node 186 */ doLeaf()187 protected abstract R doLeaf(); 188 189 /** 190 * Returns a suggested target leaf size based on the initial size estimate. 191 * 192 * @return suggested target leaf size 193 */ suggestTargetSize(long sizeEstimate)194 public static long suggestTargetSize(long sizeEstimate) { 195 long est = sizeEstimate / getLeafTarget(); 196 return est > 0L ? est : 1L; 197 } 198 199 /** 200 * Returns the targetSize, initializing it via the supplied 201 * size estimate if not already initialized. 202 */ getTargetSize(long sizeEstimate)203 protected final long getTargetSize(long sizeEstimate) { 204 long s; 205 return ((s = targetSize) != 0 ? s : 206 (targetSize = suggestTargetSize(sizeEstimate))); 207 } 208 209 /** 210 * Returns the local result, if any. Subclasses should use 211 * {@link #setLocalResult(Object)} and {@link #getLocalResult()} to manage 212 * results. This returns the local result so that calls from within the 213 * fork-join framework will return the correct result. 214 * 215 * @return local result for this node previously stored with 216 * {@link #setLocalResult} 217 */ 218 @Override getRawResult()219 public R getRawResult() { 220 return localResult; 221 } 222 223 /** 224 * Does nothing; instead, subclasses should use 225 * {@link #setLocalResult(Object)}} to manage results. 226 * 227 * @param result must be null, or an exception is thrown (this is a safety 228 * tripwire to detect when {@code setRawResult()} is being used 229 * instead of {@code setLocalResult()} 230 */ 231 @Override setRawResult(R result)232 protected void setRawResult(R result) { 233 if (result != null) 234 throw new IllegalStateException(); 235 } 236 237 /** 238 * Retrieves a result previously stored with {@link #setLocalResult} 239 * 240 * @return local result for this node previously stored with 241 * {@link #setLocalResult} 242 */ getLocalResult()243 protected R getLocalResult() { 244 return localResult; 245 } 246 247 /** 248 * Associates the result with the task, can be retrieved with 249 * {@link #getLocalResult} 250 * 251 * @param localResult local result for this node 252 */ setLocalResult(R localResult)253 protected void setLocalResult(R localResult) { 254 this.localResult = localResult; 255 } 256 257 /** 258 * Indicates whether this task is a leaf node. (Only valid after 259 * {@link #compute} has been called on this node). If the node is not a 260 * leaf node, then children will be non-null and numChildren will be 261 * positive. 262 * 263 * @return {@code true} if this task is a leaf node 264 */ isLeaf()265 protected boolean isLeaf() { 266 return leftChild == null; 267 } 268 269 /** 270 * Indicates whether this task is the root node 271 * 272 * @return {@code true} if this task is the root node. 273 */ isRoot()274 protected boolean isRoot() { 275 return getParent() == null; 276 } 277 278 /** 279 * Returns the parent of this task, or null if this task is the root 280 * 281 * @return the parent of this task, or null if this task is the root 282 */ 283 @SuppressWarnings("unchecked") getParent()284 protected K getParent() { 285 return (K) getCompleter(); 286 } 287 288 /** 289 * Decides whether or not to split a task further or compute it 290 * directly. If computing directly, calls {@code doLeaf} and pass 291 * the result to {@code setRawResult}. Otherwise splits off 292 * subtasks, forking one and continuing as the other. 293 * 294 * <p> The method is structured to conserve resources across a 295 * range of uses. The loop continues with one of the child tasks 296 * when split, to avoid deep recursion. To cope with spliterators 297 * that may be systematically biased toward left-heavy or 298 * right-heavy splits, we alternate which child is forked versus 299 * continued in the loop. 300 */ 301 @Override compute()302 public void compute() { 303 Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators 304 long sizeEstimate = rs.estimateSize(); 305 long sizeThreshold = getTargetSize(sizeEstimate); 306 boolean forkRight = false; 307 @SuppressWarnings("unchecked") K task = (K) this; 308 while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) { 309 K leftChild, rightChild, taskToFork; 310 task.leftChild = leftChild = task.makeChild(ls); 311 task.rightChild = rightChild = task.makeChild(rs); 312 task.setPendingCount(1); 313 if (forkRight) { 314 forkRight = false; 315 rs = ls; 316 task = leftChild; 317 taskToFork = rightChild; 318 } 319 else { 320 forkRight = true; 321 task = rightChild; 322 taskToFork = leftChild; 323 } 324 taskToFork.fork(); 325 sizeEstimate = rs.estimateSize(); 326 } 327 task.setLocalResult(task.doLeaf()); 328 task.tryComplete(); 329 } 330 331 /** 332 * {@inheritDoc} 333 * 334 * @implNote 335 * Clears spliterator and children fields. Overriders MUST call 336 * {@code super.onCompletion} as the last thing they do if they want these 337 * cleared. 338 */ 339 @Override onCompletion(CountedCompleter<?> caller)340 public void onCompletion(CountedCompleter<?> caller) { 341 spliterator = null; 342 leftChild = rightChild = null; 343 } 344 345 /** 346 * Returns whether this node is a "leftmost" node -- whether the path from 347 * the root to this node involves only traversing leftmost child links. For 348 * a leaf node, this means it is the first leaf node in the encounter order. 349 * 350 * @return {@code true} if this node is a "leftmost" node 351 */ isLeftmostNode()352 protected boolean isLeftmostNode() { 353 @SuppressWarnings("unchecked") 354 K node = (K) this; 355 while (node != null) { 356 K parent = node.getParent(); 357 if (parent != null && parent.leftChild != node) 358 return false; 359 node = parent; 360 } 361 return true; 362 } 363 } 364