1 /*
2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 package java.util.stream;
26 
27 import java.util.Spliterator;
28 import java.util.concurrent.atomic.AtomicReference;
29 
30 /**
31  * Abstract class for fork-join tasks used to implement short-circuiting
32  * stream ops, which can produce a result without processing all elements of the
33  * stream.
34  *
35  * @param <P_IN> type of input elements to the pipeline
36  * @param <P_OUT> type of output elements from the pipeline
37  * @param <R> type of intermediate result, may be different from operation
38  *        result type
39  * @param <K> type of child and sibling tasks
40  * @since 1.8
41  */
42 @SuppressWarnings("serial")
43 abstract class AbstractShortCircuitTask<P_IN, P_OUT, R,
44                                         K extends AbstractShortCircuitTask<P_IN, P_OUT, R, K>>
45         extends AbstractTask<P_IN, P_OUT, R, K> {
46     /**
47      * The result for this computation; this is shared among all tasks and set
48      * exactly once
49      */
50     protected final AtomicReference<R> sharedResult;
51 
52     /**
53      * Indicates whether this task has been canceled.  Tasks may cancel other
54      * tasks in the computation under various conditions, such as in a
55      * find-first operation, a task that finds a value will cancel all tasks
56      * that are later in the encounter order.
57      */
58     protected volatile boolean canceled;
59 
60     /**
61      * Constructor for root tasks.
62      *
63      * @param helper the {@code PipelineHelper} describing the stream pipeline
64      *               up to this operation
65      * @param spliterator the {@code Spliterator} describing the source for this
66      *                    pipeline
67      */
AbstractShortCircuitTask(PipelineHelper<P_OUT> helper, Spliterator<P_IN> spliterator)68     protected AbstractShortCircuitTask(PipelineHelper<P_OUT> helper,
69                                        Spliterator<P_IN> spliterator) {
70         super(helper, spliterator);
71         sharedResult = new AtomicReference<>();
72     }
73 
74     /**
75      * Constructor for non-root nodes.
76      *
77      * @param parent parent task in the computation tree
78      * @param spliterator the {@code Spliterator} for the portion of the
79      *                    computation tree described by this task
80      */
AbstractShortCircuitTask(K parent, Spliterator<P_IN> spliterator)81     protected AbstractShortCircuitTask(K parent,
82                                        Spliterator<P_IN> spliterator) {
83         super(parent, spliterator);
84         sharedResult = parent.sharedResult;
85     }
86 
87     /**
88      * Returns the value indicating the computation completed with no task
89      * finding a short-circuitable result.  For example, for a "find" operation,
90      * this might be null or an empty {@code Optional}.
91      *
92      * @return the result to return when no task finds a result
93      */
getEmptyResult()94     protected abstract R getEmptyResult();
95 
96     /**
97      * Overrides AbstractTask version to include checks for early
98      * exits while splitting or computing.
99      */
100     @Override
compute()101     public void compute() {
102         Spliterator<P_IN> rs = spliterator, ls;
103         long sizeEstimate = rs.estimateSize();
104         long sizeThreshold = getTargetSize(sizeEstimate);
105         boolean forkRight = false;
106         @SuppressWarnings("unchecked") K task = (K) this;
107         AtomicReference<R> sr = sharedResult;
108         R result;
109         while ((result = sr.get()) == null) {
110             if (task.taskCanceled()) {
111                 result = task.getEmptyResult();
112                 break;
113             }
114             if (sizeEstimate <= sizeThreshold || (ls = rs.trySplit()) == null) {
115                 result = task.doLeaf();
116                 break;
117             }
118             K leftChild, rightChild, taskToFork;
119             task.leftChild  = leftChild = task.makeChild(ls);
120             task.rightChild = rightChild = task.makeChild(rs);
121             task.setPendingCount(1);
122             if (forkRight) {
123                 forkRight = false;
124                 rs = ls;
125                 task = leftChild;
126                 taskToFork = rightChild;
127             }
128             else {
129                 forkRight = true;
130                 task = rightChild;
131                 taskToFork = leftChild;
132             }
133             taskToFork.fork();
134             sizeEstimate = rs.estimateSize();
135         }
136         task.setLocalResult(result);
137         task.tryComplete();
138     }
139 
140 
141     /**
142      * Declares that a globally valid result has been found.  If another task has
143      * not already found the answer, the result is installed in
144      * {@code sharedResult}.  The {@code compute()} method will check
145      * {@code sharedResult} before proceeding with computation, so this causes
146      * the computation to terminate early.
147      *
148      * @param result the result found
149      */
shortCircuit(R result)150     protected void shortCircuit(R result) {
151         if (result != null)
152             sharedResult.compareAndSet(null, result);
153     }
154 
155     /**
156      * Sets a local result for this task.  If this task is the root, set the
157      * shared result instead (if not already set).
158      *
159      * @param localResult The result to set for this task
160      */
161     @Override
setLocalResult(R localResult)162     protected void setLocalResult(R localResult) {
163         if (isRoot()) {
164             if (localResult != null)
165                 sharedResult.compareAndSet(null, localResult);
166         }
167         else
168             super.setLocalResult(localResult);
169     }
170 
171     /**
172      * Retrieves the local result for this task
173      */
174     @Override
getRawResult()175     public R getRawResult() {
176         return getLocalResult();
177     }
178 
179     /**
180      * Retrieves the local result for this task.  If this task is the root,
181      * retrieves the shared result instead.
182      */
183     @Override
getLocalResult()184     public R getLocalResult() {
185         if (isRoot()) {
186             R answer = sharedResult.get();
187             return (answer == null) ? getEmptyResult() : answer;
188         }
189         else
190             return super.getLocalResult();
191     }
192 
193     /**
194      * Mark this task as canceled
195      */
cancel()196     protected void cancel() {
197         canceled = true;
198     }
199 
200     /**
201      * Queries whether this task is canceled.  A task is considered canceled if
202      * it or any of its parents have been canceled.
203      *
204      * @return {@code true} if this task or any parent is canceled.
205      */
taskCanceled()206     protected boolean taskCanceled() {
207         boolean cancel = canceled;
208         if (!cancel) {
209             for (K parent = getParent(); !cancel && parent != null; parent = parent.getParent())
210                 cancel = parent.canceled;
211         }
212 
213         return cancel;
214     }
215 
216     /**
217      * Cancels all tasks which succeed this one in the encounter order.  This
218      * includes canceling all the current task's right sibling, as well as the
219      * later right siblings of all its parents.
220      */
cancelLaterNodes()221     protected void cancelLaterNodes() {
222         // Go up the tree, cancel right siblings of this node and all parents
223         for (@SuppressWarnings("unchecked") K parent = getParent(), node = (K) this;
224              parent != null;
225              node = parent, parent = parent.getParent()) {
226             // If node is a left child of parent, then has a right sibling
227             if (parent.leftChild == node) {
228                 K rightSibling = parent.rightChild;
229                 if (!rightSibling.canceled)
230                     rightSibling.cancel();
231             }
232         }
233     }
234 }
235