1 /*
2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 package java.util.stream;
26 
27 import java.util.Spliterator;
28 import java.util.concurrent.CountedCompleter;
29 import java.util.concurrent.ForkJoinPool;
30 
31 /**
32  * Abstract base class for most fork-join tasks used to implement stream ops.
33  * Manages splitting logic, tracking of child tasks, and intermediate results.
34  * Each task is associated with a {@link Spliterator} that describes the portion
35  * of the input associated with the subtree rooted at this task.
36  * Tasks may be leaf nodes (which will traverse the elements of
37  * the {@code Spliterator}) or internal nodes (which split the
38  * {@code Spliterator} into multiple child tasks).
39  *
40  * @implNote
41  * <p>This class is based on {@link CountedCompleter}, a form of fork-join task
42  * where each task has a semaphore-like count of uncompleted children, and the
43  * task is implicitly completed and notified when its last child completes.
44  * Internal node tasks will likely override the {@code onCompletion} method from
45  * {@code CountedCompleter} to merge the results from child tasks into the
46  * current task's result.
47  *
48  * <p>Splitting and setting up the child task links is done by {@code compute()}
49  * for internal nodes.  At {@code compute()} time for leaf nodes, it is
50  * guaranteed that the parent's child-related fields (including sibling links
51  * for the parent's children) will be set up for all children.
52  *
53  * <p>For example, a task that performs a reduce would override {@code doLeaf()}
54  * to perform a reduction on that leaf node's chunk using the
55  * {@code Spliterator}, and override {@code onCompletion()} to merge the results
56  * of the child tasks for internal nodes:
57  *
58  * <pre>{@code
59  *     protected S doLeaf() {
60  *         spliterator.forEach(...);
61  *         return localReductionResult;
62  *     }
63  *
64  *     public void onCompletion(CountedCompleter caller) {
65  *         if (!isLeaf()) {
66  *             ReduceTask<P_IN, P_OUT, T, R> child = children;
67  *             R result = child.getLocalResult();
68  *             child = child.nextSibling;
69  *             for (; child != null; child = child.nextSibling)
70  *                 result = combine(result, child.getLocalResult());
71  *             setLocalResult(result);
72  *         }
73  *     }
74  * }</pre>
75  *
76  * <p>Serialization is not supported as there is no intention to serialize
77  * tasks managed by stream ops.
78  *
79  * @param <P_IN> Type of elements input to the pipeline
80  * @param <P_OUT> Type of elements output from the pipeline
81  * @param <R> Type of intermediate result, which may be different from operation
82  *        result type
83  * @param <K> Type of parent, child and sibling tasks
84  * @since 1.8
85  */
86 @SuppressWarnings("serial")
87 abstract class AbstractTask<P_IN, P_OUT, R,
88                             K extends AbstractTask<P_IN, P_OUT, R, K>>
89         extends CountedCompleter<R> {
90 
91     /**
92      * Default target factor of leaf tasks for parallel decomposition.
93      * To allow load balancing, we over-partition, currently to approximately
94      * four tasks per processor, which enables others to help out
95      * if leaf tasks are uneven or some processors are otherwise busy.
96      */
97     static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;
98 
99     /** The pipeline helper, common to all tasks in a computation */
100     protected final PipelineHelper<P_OUT> helper;
101 
102     /**
103      * The spliterator for the portion of the input associated with the subtree
104      * rooted at this task
105      */
106     protected Spliterator<P_IN> spliterator;
107 
108     /** Target leaf size, common to all tasks in a computation */
109     protected long targetSize; // may be laziliy initialized
110 
111     /**
112      * The left child.
113      * null if no children
114      * if non-null rightChild is non-null
115      */
116     protected K leftChild;
117 
118     /**
119      * The right child.
120      * null if no children
121      * if non-null leftChild is non-null
122      */
123     protected K rightChild;
124 
125     /** The result of this node, if completed */
126     private R localResult;
127 
128     /**
129      * Constructor for root nodes.
130      *
131      * @param helper The {@code PipelineHelper} describing the stream pipeline
132      *               up to this operation
133      * @param spliterator The {@code Spliterator} describing the source for this
134      *                    pipeline
135      */
AbstractTask(PipelineHelper<P_OUT> helper, Spliterator<P_IN> spliterator)136     protected AbstractTask(PipelineHelper<P_OUT> helper,
137                            Spliterator<P_IN> spliterator) {
138         super(null);
139         this.helper = helper;
140         this.spliterator = spliterator;
141         this.targetSize = 0L;
142     }
143 
144     /**
145      * Constructor for non-root nodes.
146      *
147      * @param parent this node's parent task
148      * @param spliterator {@code Spliterator} describing the subtree rooted at
149      *        this node, obtained by splitting the parent {@code Spliterator}
150      */
AbstractTask(K parent, Spliterator<P_IN> spliterator)151     protected AbstractTask(K parent,
152                            Spliterator<P_IN> spliterator) {
153         super(parent);
154         this.spliterator = spliterator;
155         this.helper = parent.helper;
156         this.targetSize = parent.targetSize;
157     }
158 
159     /**
160      * Constructs a new node of type T whose parent is the receiver; must call
161      * the AbstractTask(T, Spliterator) constructor with the receiver and the
162      * provided Spliterator.
163      *
164      * @param spliterator {@code Spliterator} describing the subtree rooted at
165      *        this node, obtained by splitting the parent {@code Spliterator}
166      * @return newly constructed child node
167      */
makeChild(Spliterator<P_IN> spliterator)168     protected abstract K makeChild(Spliterator<P_IN> spliterator);
169 
170     /**
171      * Computes the result associated with a leaf node.  Will be called by
172      * {@code compute()} and the result passed to @{code setLocalResult()}
173      *
174      * @return the computed result of a leaf node
175      */
doLeaf()176     protected abstract R doLeaf();
177 
178     /**
179      * Returns a suggested target leaf size based on the initial size estimate.
180      *
181      * @return suggested target leaf size
182      */
suggestTargetSize(long sizeEstimate)183     public static long suggestTargetSize(long sizeEstimate) {
184         long est = sizeEstimate / LEAF_TARGET;
185         return est > 0L ? est : 1L;
186     }
187 
188     /**
189      * Returns the targetSize, initializing it via the supplied
190      * size estimate if not already initialized.
191      */
getTargetSize(long sizeEstimate)192     protected final long getTargetSize(long sizeEstimate) {
193         long s;
194         return ((s = targetSize) != 0 ? s :
195                 (targetSize = suggestTargetSize(sizeEstimate)));
196     }
197 
198     /**
199      * Returns the local result, if any. Subclasses should use
200      * {@link #setLocalResult(Object)} and {@link #getLocalResult()} to manage
201      * results.  This returns the local result so that calls from within the
202      * fork-join framework will return the correct result.
203      *
204      * @return local result for this node previously stored with
205      * {@link #setLocalResult}
206      */
207     @Override
getRawResult()208     public R getRawResult() {
209         return localResult;
210     }
211 
212     /**
213      * Does nothing; instead, subclasses should use
214      * {@link #setLocalResult(Object)}} to manage results.
215      *
216      * @param result must be null, or an exception is thrown (this is a safety
217      *        tripwire to detect when {@code setRawResult()} is being used
218      *        instead of {@code setLocalResult()}
219      */
220     @Override
setRawResult(R result)221     protected void setRawResult(R result) {
222         if (result != null)
223             throw new IllegalStateException();
224     }
225 
226     /**
227      * Retrieves a result previously stored with {@link #setLocalResult}
228      *
229      * @return local result for this node previously stored with
230      * {@link #setLocalResult}
231      */
getLocalResult()232     protected R getLocalResult() {
233         return localResult;
234     }
235 
236     /**
237      * Associates the result with the task, can be retrieved with
238      * {@link #getLocalResult}
239      *
240      * @param localResult local result for this node
241      */
setLocalResult(R localResult)242     protected void setLocalResult(R localResult) {
243         this.localResult = localResult;
244     }
245 
246     /**
247      * Indicates whether this task is a leaf node.  (Only valid after
248      * {@link #compute} has been called on this node).  If the node is not a
249      * leaf node, then children will be non-null and numChildren will be
250      * positive.
251      *
252      * @return {@code true} if this task is a leaf node
253      */
isLeaf()254     protected boolean isLeaf() {
255         return leftChild == null;
256     }
257 
258     /**
259      * Indicates whether this task is the root node
260      *
261      * @return {@code true} if this task is the root node.
262      */
isRoot()263     protected boolean isRoot() {
264         return getParent() == null;
265     }
266 
267     /**
268      * Returns the parent of this task, or null if this task is the root
269      *
270      * @return the parent of this task, or null if this task is the root
271      */
272     @SuppressWarnings("unchecked")
getParent()273     protected K getParent() {
274         return (K) getCompleter();
275     }
276 
277     /**
278      * Decides whether or not to split a task further or compute it
279      * directly. If computing directly, calls {@code doLeaf} and pass
280      * the result to {@code setRawResult}. Otherwise splits off
281      * subtasks, forking one and continuing as the other.
282      *
283      * <p> The method is structured to conserve resources across a
284      * range of uses.  The loop continues with one of the child tasks
285      * when split, to avoid deep recursion. To cope with spliterators
286      * that may be systematically biased toward left-heavy or
287      * right-heavy splits, we alternate which child is forked versus
288      * continued in the loop.
289      */
290     @Override
compute()291     public void compute() {
292         Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
293         long sizeEstimate = rs.estimateSize();
294         long sizeThreshold = getTargetSize(sizeEstimate);
295         boolean forkRight = false;
296         @SuppressWarnings("unchecked") K task = (K) this;
297         while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
298             K leftChild, rightChild, taskToFork;
299             task.leftChild  = leftChild = task.makeChild(ls);
300             task.rightChild = rightChild = task.makeChild(rs);
301             task.setPendingCount(1);
302             if (forkRight) {
303                 forkRight = false;
304                 rs = ls;
305                 task = leftChild;
306                 taskToFork = rightChild;
307             }
308             else {
309                 forkRight = true;
310                 task = rightChild;
311                 taskToFork = leftChild;
312             }
313             taskToFork.fork();
314             sizeEstimate = rs.estimateSize();
315         }
316         task.setLocalResult(task.doLeaf());
317         task.tryComplete();
318     }
319 
320     /**
321      * {@inheritDoc}
322      *
323      * @implNote
324      * Clears spliterator and children fields.  Overriders MUST call
325      * {@code super.onCompletion} as the last thing they do if they want these
326      * cleared.
327      */
328     @Override
onCompletion(CountedCompleter<?> caller)329     public void onCompletion(CountedCompleter<?> caller) {
330         spliterator = null;
331         leftChild = rightChild = null;
332     }
333 
334     /**
335      * Returns whether this node is a "leftmost" node -- whether the path from
336      * the root to this node involves only traversing leftmost child links.  For
337      * a leaf node, this means it is the first leaf node in the encounter order.
338      *
339      * @return {@code true} if this node is a "leftmost" node
340      */
isLeftmostNode()341     protected boolean isLeftmostNode() {
342         @SuppressWarnings("unchecked")
343         K node = (K) this;
344         while (node != null) {
345             K parent = node.getParent();
346             if (parent != null && parent.leftChild != node)
347                 return false;
348             node = parent;
349         }
350         return true;
351     }
352 }
353