1 /* 2 * Copyright (c) 2012, 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Optional; 28 import java.util.OptionalDouble; 29 import java.util.OptionalInt; 30 import java.util.OptionalLong; 31 import java.util.Spliterator; 32 import java.util.concurrent.CountedCompleter; 33 import java.util.function.Predicate; 34 import java.util.function.Supplier; 35 36 /** 37 * Factory for instances of a short-circuiting {@code TerminalOp} that searches 38 * for an element in a stream pipeline, and terminates when it finds one. 39 * Supported variants include find-first (find the first element in the 40 * encounter order) and find-any (find any element, may not be the first in 41 * encounter order.) 42 * 43 * @since 1.8 44 */ 45 final class FindOps { 46 FindOps()47 private FindOps() { } 48 49 /** 50 * Constructs a {@code TerminalOp} for streams of objects. 51 * 52 * @param <T> the type of elements of the stream 53 * @param mustFindFirst whether the {@code TerminalOp} must produce the 54 * first element in the encounter order 55 * @return a {@code TerminalOp} implementing the find operation 56 */ 57 @SuppressWarnings("unchecked") makeRef(boolean mustFindFirst)58 public static <T> TerminalOp<T, Optional<T>> makeRef(boolean mustFindFirst) { 59 return (TerminalOp<T, Optional<T>>) 60 (mustFindFirst ? FindSink.OfRef.OP_FIND_FIRST : FindSink.OfRef.OP_FIND_ANY); 61 } 62 63 /** 64 * Constructs a {@code TerminalOp} for streams of ints. 65 * 66 * @param mustFindFirst whether the {@code TerminalOp} must produce the 67 * first element in the encounter order 68 * @return a {@code TerminalOp} implementing the find operation 69 */ makeInt(boolean mustFindFirst)70 public static TerminalOp<Integer, OptionalInt> makeInt(boolean mustFindFirst) { 71 return mustFindFirst ? FindSink.OfInt.OP_FIND_FIRST : FindSink.OfInt.OP_FIND_ANY; 72 } 73 74 /** 75 * Constructs a {@code TerminalOp} for streams of longs. 76 * 77 * @param mustFindFirst whether the {@code TerminalOp} must produce the 78 * first element in the encounter order 79 * @return a {@code TerminalOp} implementing the find operation 80 */ makeLong(boolean mustFindFirst)81 public static TerminalOp<Long, OptionalLong> makeLong(boolean mustFindFirst) { 82 return mustFindFirst ? FindSink.OfLong.OP_FIND_FIRST : FindSink.OfLong.OP_FIND_ANY; 83 } 84 85 /** 86 * Constructs a {@code FindOp} for streams of doubles. 87 * 88 * @param mustFindFirst whether the {@code TerminalOp} must produce the 89 * first element in the encounter order 90 * @return a {@code TerminalOp} implementing the find operation 91 */ makeDouble(boolean mustFindFirst)92 public static TerminalOp<Double, OptionalDouble> makeDouble(boolean mustFindFirst) { 93 return mustFindFirst ? FindSink.OfDouble.OP_FIND_FIRST : FindSink.OfDouble.OP_FIND_ANY; 94 } 95 96 /** 97 * A short-circuiting {@code TerminalOp} that searches for an element in a 98 * stream pipeline, and terminates when it finds one. Implements both 99 * find-first (find the first element in the encounter order) and find-any 100 * (find any element, may not be the first in encounter order.) 101 * 102 * @param <T> the output type of the stream pipeline 103 * @param <O> the result type of the find operation, typically an optional 104 * type 105 */ 106 private static final class FindOp<T, O> implements TerminalOp<T, O> { 107 private final StreamShape shape; 108 final int opFlags; 109 final O emptyValue; 110 final Predicate<O> presentPredicate; 111 final Supplier<TerminalSink<T, O>> sinkSupplier; 112 113 /** 114 * Constructs a {@code FindOp}. 115 * 116 * @param mustFindFirst if true, must find the first element in 117 * encounter order, otherwise can find any element 118 * @param shape stream shape of elements to search 119 * @param emptyValue result value corresponding to "found nothing" 120 * @param presentPredicate {@code Predicate} on result value 121 * corresponding to "found something" 122 * @param sinkSupplier supplier for a {@code TerminalSink} implementing 123 * the matching functionality 124 */ FindOp(boolean mustFindFirst, StreamShape shape, O emptyValue, Predicate<O> presentPredicate, Supplier<TerminalSink<T, O>> sinkSupplier)125 FindOp(boolean mustFindFirst, 126 StreamShape shape, 127 O emptyValue, 128 Predicate<O> presentPredicate, 129 Supplier<TerminalSink<T, O>> sinkSupplier) { 130 this.opFlags = StreamOpFlag.IS_SHORT_CIRCUIT | (mustFindFirst ? 0 : StreamOpFlag.NOT_ORDERED); 131 this.shape = shape; 132 this.emptyValue = emptyValue; 133 this.presentPredicate = presentPredicate; 134 this.sinkSupplier = sinkSupplier; 135 } 136 137 @Override getOpFlags()138 public int getOpFlags() { 139 return opFlags; 140 } 141 142 @Override inputShape()143 public StreamShape inputShape() { 144 return shape; 145 } 146 147 @Override evaluateSequential(PipelineHelper<T> helper, Spliterator<S> spliterator)148 public <S> O evaluateSequential(PipelineHelper<T> helper, 149 Spliterator<S> spliterator) { 150 O result = helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).get(); 151 return result != null ? result : emptyValue; 152 } 153 154 @Override evaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator)155 public <P_IN> O evaluateParallel(PipelineHelper<T> helper, 156 Spliterator<P_IN> spliterator) { 157 // This takes into account the upstream ops flags and the terminal 158 // op flags and therefore takes into account findFirst or findAny 159 boolean mustFindFirst = StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags()); 160 return new FindTask<>(this, mustFindFirst, helper, spliterator).invoke(); 161 } 162 } 163 164 /** 165 * Implementation of @{code TerminalSink} that implements the find 166 * functionality, requesting cancellation when something has been found 167 * 168 * @param <T> The type of input element 169 * @param <O> The result type, typically an optional type 170 */ 171 private abstract static class FindSink<T, O> implements TerminalSink<T, O> { 172 boolean hasValue; 173 T value; 174 FindSink()175 FindSink() {} // Avoid creation of special accessor 176 177 @Override accept(T value)178 public void accept(T value) { 179 if (!hasValue) { 180 hasValue = true; 181 this.value = value; 182 } 183 } 184 185 @Override cancellationRequested()186 public boolean cancellationRequested() { 187 return hasValue; 188 } 189 190 /** Specialization of {@code FindSink} for reference streams */ 191 static final class OfRef<T> extends FindSink<T, Optional<T>> { 192 @Override get()193 public Optional<T> get() { 194 return hasValue ? Optional.of(value) : null; 195 } 196 197 static final TerminalOp<?, ?> OP_FIND_FIRST = new FindOp<>(true, 198 StreamShape.REFERENCE, Optional.empty(), 199 Optional::isPresent, FindSink.OfRef::new); 200 201 static final TerminalOp<?, ?> OP_FIND_ANY = new FindOp<>(false, 202 StreamShape.REFERENCE, Optional.empty(), 203 Optional::isPresent, FindSink.OfRef::new); 204 } 205 206 /** Specialization of {@code FindSink} for int streams */ 207 static final class OfInt extends FindSink<Integer, OptionalInt> 208 implements Sink.OfInt { 209 @Override accept(int value)210 public void accept(int value) { 211 // Boxing is OK here, since few values will actually flow into the sink 212 accept((Integer) value); 213 } 214 215 @Override get()216 public OptionalInt get() { 217 return hasValue ? OptionalInt.of(value) : null; 218 } 219 220 static final TerminalOp<Integer, OptionalInt> OP_FIND_FIRST = new FindOp<>(true, 221 StreamShape.INT_VALUE, OptionalInt.empty(), 222 OptionalInt::isPresent, FindSink.OfInt::new); 223 static final TerminalOp<Integer, OptionalInt> OP_FIND_ANY = new FindOp<>(false, 224 StreamShape.INT_VALUE, OptionalInt.empty(), 225 OptionalInt::isPresent, FindSink.OfInt::new); 226 } 227 228 /** Specialization of {@code FindSink} for long streams */ 229 static final class OfLong extends FindSink<Long, OptionalLong> 230 implements Sink.OfLong { 231 @Override accept(long value)232 public void accept(long value) { 233 // Boxing is OK here, since few values will actually flow into the sink 234 accept((Long) value); 235 } 236 237 @Override get()238 public OptionalLong get() { 239 return hasValue ? OptionalLong.of(value) : null; 240 } 241 242 static final TerminalOp<Long, OptionalLong> OP_FIND_FIRST = new FindOp<>(true, 243 StreamShape.LONG_VALUE, OptionalLong.empty(), 244 OptionalLong::isPresent, FindSink.OfLong::new); 245 static final TerminalOp<Long, OptionalLong> OP_FIND_ANY = new FindOp<>(false, 246 StreamShape.LONG_VALUE, OptionalLong.empty(), 247 OptionalLong::isPresent, FindSink.OfLong::new); 248 } 249 250 /** Specialization of {@code FindSink} for double streams */ 251 static final class OfDouble extends FindSink<Double, OptionalDouble> 252 implements Sink.OfDouble { 253 @Override accept(double value)254 public void accept(double value) { 255 // Boxing is OK here, since few values will actually flow into the sink 256 accept((Double) value); 257 } 258 259 @Override get()260 public OptionalDouble get() { 261 return hasValue ? OptionalDouble.of(value) : null; 262 } 263 264 static final TerminalOp<Double, OptionalDouble> OP_FIND_FIRST = new FindOp<>(true, 265 StreamShape.DOUBLE_VALUE, OptionalDouble.empty(), 266 OptionalDouble::isPresent, FindSink.OfDouble::new); 267 static final TerminalOp<Double, OptionalDouble> OP_FIND_ANY = new FindOp<>(false, 268 StreamShape.DOUBLE_VALUE, OptionalDouble.empty(), 269 OptionalDouble::isPresent, FindSink.OfDouble::new); 270 } 271 } 272 273 /** 274 * {@code ForkJoinTask} implementing parallel short-circuiting search 275 * @param <P_IN> Input element type to the stream pipeline 276 * @param <P_OUT> Output element type from the stream pipeline 277 * @param <O> Result type from the find operation 278 */ 279 @SuppressWarnings("serial") 280 private static final class FindTask<P_IN, P_OUT, O> 281 extends AbstractShortCircuitTask<P_IN, P_OUT, O, FindTask<P_IN, P_OUT, O>> { 282 private final FindOp<P_OUT, O> op; 283 private final boolean mustFindFirst; 284 FindTask(FindOp<P_OUT, O> op, boolean mustFindFirst, PipelineHelper<P_OUT> helper, Spliterator<P_IN> spliterator)285 FindTask(FindOp<P_OUT, O> op, 286 boolean mustFindFirst, 287 PipelineHelper<P_OUT> helper, 288 Spliterator<P_IN> spliterator) { 289 super(helper, spliterator); 290 this.mustFindFirst = mustFindFirst; 291 this.op = op; 292 } 293 FindTask(FindTask<P_IN, P_OUT, O> parent, Spliterator<P_IN> spliterator)294 FindTask(FindTask<P_IN, P_OUT, O> parent, Spliterator<P_IN> spliterator) { 295 super(parent, spliterator); 296 this.mustFindFirst = parent.mustFindFirst; 297 this.op = parent.op; 298 } 299 300 @Override makeChild(Spliterator<P_IN> spliterator)301 protected FindTask<P_IN, P_OUT, O> makeChild(Spliterator<P_IN> spliterator) { 302 return new FindTask<>(this, spliterator); 303 } 304 305 @Override getEmptyResult()306 protected O getEmptyResult() { 307 return op.emptyValue; 308 } 309 foundResult(O answer)310 private void foundResult(O answer) { 311 if (isLeftmostNode()) 312 shortCircuit(answer); 313 else 314 cancelLaterNodes(); 315 } 316 317 @Override doLeaf()318 protected O doLeaf() { 319 O result = helper.wrapAndCopyInto(op.sinkSupplier.get(), spliterator).get(); 320 if (!mustFindFirst) { 321 if (result != null) 322 shortCircuit(result); 323 return null; 324 } 325 else { 326 if (result != null) { 327 foundResult(result); 328 return result; 329 } 330 else 331 return null; 332 } 333 } 334 335 @Override onCompletion(CountedCompleter<?> caller)336 public void onCompletion(CountedCompleter<?> caller) { 337 if (mustFindFirst) { 338 for (FindTask<P_IN, P_OUT, O> child = leftChild, p = null; child != p; 339 p = child, child = rightChild) { 340 O result = child.getLocalResult(); 341 if (result != null && op.presentPredicate.test(result)) { 342 setLocalResult(result); 343 foundResult(result); 344 break; 345 } 346 } 347 } 348 super.onCompletion(caller); 349 } 350 } 351 } 352 353