1 /* 2 * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Objects; 28 import java.util.Spliterator; 29 import java.util.concurrent.ConcurrentHashMap; 30 import java.util.concurrent.CountedCompleter; 31 import java.util.function.Consumer; 32 import java.util.function.DoubleConsumer; 33 import java.util.function.IntConsumer; 34 import java.util.function.IntFunction; 35 import java.util.function.LongConsumer; 36 37 /** 38 * Factory for creating instances of {@code TerminalOp} that perform an 39 * action for every element of a stream. Supported variants include unordered 40 * traversal (elements are provided to the {@code Consumer} as soon as they are 41 * available), and ordered traversal (elements are provided to the 42 * {@code Consumer} in encounter order.) 43 * 44 * <p>Elements are provided to the {@code Consumer} on whatever thread and 45 * whatever order they become available. For ordered traversals, it is 46 * guaranteed that processing an element <em>happens-before</em> processing 47 * subsequent elements in the encounter order. 48 * 49 * <p>Exceptions occurring as a result of sending an element to the 50 * {@code Consumer} will be relayed to the caller and traversal will be 51 * prematurely terminated. 52 * 53 * @since 1.8 54 */ 55 final class ForEachOps { 56 ForEachOps()57 private ForEachOps() { } 58 59 /** 60 * Constructs a {@code TerminalOp} that perform an action for every element 61 * of a stream. 62 * 63 * @param action the {@code Consumer} that receives all elements of a 64 * stream 65 * @param ordered whether an ordered traversal is requested 66 * @param <T> the type of the stream elements 67 * @return the {@code TerminalOp} instance 68 */ makeRef(Consumer<? super T> action, boolean ordered)69 public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action, 70 boolean ordered) { 71 Objects.requireNonNull(action); 72 return new ForEachOp.OfRef<>(action, ordered); 73 } 74 75 /** 76 * Constructs a {@code TerminalOp} that perform an action for every element 77 * of an {@code IntStream}. 78 * 79 * @param action the {@code IntConsumer} that receives all elements of a 80 * stream 81 * @param ordered whether an ordered traversal is requested 82 * @return the {@code TerminalOp} instance 83 */ makeInt(IntConsumer action, boolean ordered)84 public static TerminalOp<Integer, Void> makeInt(IntConsumer action, 85 boolean ordered) { 86 Objects.requireNonNull(action); 87 return new ForEachOp.OfInt(action, ordered); 88 } 89 90 /** 91 * Constructs a {@code TerminalOp} that perform an action for every element 92 * of a {@code LongStream}. 93 * 94 * @param action the {@code LongConsumer} that receives all elements of a 95 * stream 96 * @param ordered whether an ordered traversal is requested 97 * @return the {@code TerminalOp} instance 98 */ makeLong(LongConsumer action, boolean ordered)99 public static TerminalOp<Long, Void> makeLong(LongConsumer action, 100 boolean ordered) { 101 Objects.requireNonNull(action); 102 return new ForEachOp.OfLong(action, ordered); 103 } 104 105 /** 106 * Constructs a {@code TerminalOp} that perform an action for every element 107 * of a {@code DoubleStream}. 108 * 109 * @param action the {@code DoubleConsumer} that receives all elements of 110 * a stream 111 * @param ordered whether an ordered traversal is requested 112 * @return the {@code TerminalOp} instance 113 */ makeDouble(DoubleConsumer action, boolean ordered)114 public static TerminalOp<Double, Void> makeDouble(DoubleConsumer action, 115 boolean ordered) { 116 Objects.requireNonNull(action); 117 return new ForEachOp.OfDouble(action, ordered); 118 } 119 120 /** 121 * A {@code TerminalOp} that evaluates a stream pipeline and sends the 122 * output to itself as a {@code TerminalSink}. Elements will be sent in 123 * whatever thread they become available. If the traversal is unordered, 124 * they will be sent independent of the stream's encounter order. 125 * 126 * <p>This terminal operation is stateless. For parallel evaluation, each 127 * leaf instance of a {@code ForEachTask} will send elements to the same 128 * {@code TerminalSink} reference that is an instance of this class. 129 * 130 * @param <T> the output type of the stream pipeline 131 */ 132 abstract static class ForEachOp<T> 133 implements TerminalOp<T, Void>, TerminalSink<T, Void> { 134 private final boolean ordered; 135 ForEachOp(boolean ordered)136 protected ForEachOp(boolean ordered) { 137 this.ordered = ordered; 138 } 139 140 // TerminalOp 141 142 @Override getOpFlags()143 public int getOpFlags() { 144 return ordered ? 0 : StreamOpFlag.NOT_ORDERED; 145 } 146 147 @Override evaluateSequential(PipelineHelper<T> helper, Spliterator<S> spliterator)148 public <S> Void evaluateSequential(PipelineHelper<T> helper, 149 Spliterator<S> spliterator) { 150 return helper.wrapAndCopyInto(this, spliterator).get(); 151 } 152 153 @Override evaluateParallel(PipelineHelper<T> helper, Spliterator<S> spliterator)154 public <S> Void evaluateParallel(PipelineHelper<T> helper, 155 Spliterator<S> spliterator) { 156 if (ordered) 157 new ForEachOrderedTask<>(helper, spliterator, this).invoke(); 158 else 159 new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke(); 160 return null; 161 } 162 163 // TerminalSink 164 165 @Override get()166 public Void get() { 167 return null; 168 } 169 170 // Implementations 171 172 /** Implementation class for reference streams */ 173 static final class OfRef<T> extends ForEachOp<T> { 174 final Consumer<? super T> consumer; 175 OfRef(Consumer<? super T> consumer, boolean ordered)176 OfRef(Consumer<? super T> consumer, boolean ordered) { 177 super(ordered); 178 this.consumer = consumer; 179 } 180 181 @Override accept(T t)182 public void accept(T t) { 183 consumer.accept(t); 184 } 185 } 186 187 /** Implementation class for {@code IntStream} */ 188 static final class OfInt extends ForEachOp<Integer> 189 implements Sink.OfInt { 190 final IntConsumer consumer; 191 OfInt(IntConsumer consumer, boolean ordered)192 OfInt(IntConsumer consumer, boolean ordered) { 193 super(ordered); 194 this.consumer = consumer; 195 } 196 197 @Override inputShape()198 public StreamShape inputShape() { 199 return StreamShape.INT_VALUE; 200 } 201 202 @Override accept(int t)203 public void accept(int t) { 204 consumer.accept(t); 205 } 206 } 207 208 /** Implementation class for {@code LongStream} */ 209 static final class OfLong extends ForEachOp<Long> 210 implements Sink.OfLong { 211 final LongConsumer consumer; 212 OfLong(LongConsumer consumer, boolean ordered)213 OfLong(LongConsumer consumer, boolean ordered) { 214 super(ordered); 215 this.consumer = consumer; 216 } 217 218 @Override inputShape()219 public StreamShape inputShape() { 220 return StreamShape.LONG_VALUE; 221 } 222 223 @Override accept(long t)224 public void accept(long t) { 225 consumer.accept(t); 226 } 227 } 228 229 /** Implementation class for {@code DoubleStream} */ 230 static final class OfDouble extends ForEachOp<Double> 231 implements Sink.OfDouble { 232 final DoubleConsumer consumer; 233 OfDouble(DoubleConsumer consumer, boolean ordered)234 OfDouble(DoubleConsumer consumer, boolean ordered) { 235 super(ordered); 236 this.consumer = consumer; 237 } 238 239 @Override inputShape()240 public StreamShape inputShape() { 241 return StreamShape.DOUBLE_VALUE; 242 } 243 244 @Override accept(double t)245 public void accept(double t) { 246 consumer.accept(t); 247 } 248 } 249 } 250 251 /** A {@code ForkJoinTask} for performing a parallel for-each operation */ 252 @SuppressWarnings("serial") 253 static final class ForEachTask<S, T> extends CountedCompleter<Void> { 254 private Spliterator<S> spliterator; 255 private final Sink<S> sink; 256 private final PipelineHelper<T> helper; 257 private long targetSize; 258 ForEachTask(PipelineHelper<T> helper, Spliterator<S> spliterator, Sink<S> sink)259 ForEachTask(PipelineHelper<T> helper, 260 Spliterator<S> spliterator, 261 Sink<S> sink) { 262 super(null); 263 this.sink = sink; 264 this.helper = helper; 265 this.spliterator = spliterator; 266 this.targetSize = 0L; 267 } 268 ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator)269 ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator) { 270 super(parent); 271 this.spliterator = spliterator; 272 this.sink = parent.sink; 273 this.targetSize = parent.targetSize; 274 this.helper = parent.helper; 275 } 276 277 // Similar to AbstractTask but doesn't need to track child tasks compute()278 public void compute() { 279 Spliterator<S> rightSplit = spliterator, leftSplit; 280 long sizeEstimate = rightSplit.estimateSize(), sizeThreshold; 281 if ((sizeThreshold = targetSize) == 0L) 282 targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate); 283 boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags()); 284 boolean forkRight = false; 285 Sink<S> taskSink = sink; 286 ForEachTask<S, T> task = this; 287 while (!isShortCircuit || !taskSink.cancellationRequested()) { 288 if (sizeEstimate <= sizeThreshold || 289 (leftSplit = rightSplit.trySplit()) == null) { 290 task.helper.copyInto(taskSink, rightSplit); 291 break; 292 } 293 ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit); 294 task.addToPendingCount(1); 295 ForEachTask<S, T> taskToFork; 296 if (forkRight) { 297 forkRight = false; 298 rightSplit = leftSplit; 299 taskToFork = task; 300 task = leftTask; 301 } 302 else { 303 forkRight = true; 304 taskToFork = leftTask; 305 } 306 taskToFork.fork(); 307 sizeEstimate = rightSplit.estimateSize(); 308 } 309 task.spliterator = null; 310 task.propagateCompletion(); 311 } 312 } 313 314 /** 315 * A {@code ForkJoinTask} for performing a parallel for-each operation 316 * which visits the elements in encounter order 317 */ 318 @SuppressWarnings("serial") 319 static final class ForEachOrderedTask<S, T> extends CountedCompleter<Void> { 320 /* 321 * Our goal is to ensure that the elements associated with a task are 322 * processed according to an in-order traversal of the computation tree. 323 * We use completion counts for representing these dependencies, so that 324 * a task does not complete until all the tasks preceding it in this 325 * order complete. We use the "completion map" to associate the next 326 * task in this order for any left child. We increase the pending count 327 * of any node on the right side of such a mapping by one to indicate 328 * its dependency, and when a node on the left side of such a mapping 329 * completes, it decrements the pending count of its corresponding right 330 * side. As the computation tree is expanded by splitting, we must 331 * atomically update the mappings to maintain the invariant that the 332 * completion map maps left children to the next node in the in-order 333 * traversal. 334 * 335 * Take, for example, the following computation tree of tasks: 336 * 337 * a 338 * / \ 339 * b c 340 * / \ / \ 341 * d e f g 342 * 343 * The complete map will contain (not necessarily all at the same time) 344 * the following associations: 345 * 346 * d -> e 347 * b -> f 348 * f -> g 349 * 350 * Tasks e, f, g will have their pending counts increased by 1. 351 * 352 * The following relationships hold: 353 * 354 * - completion of d "happens-before" e; 355 * - completion of d and e "happens-before b; 356 * - completion of b "happens-before" f; and 357 * - completion of f "happens-before" g 358 * 359 * Thus overall the "happens-before" relationship holds for the 360 * reporting of elements, covered by tasks d, e, f and g, as specified 361 * by the forEachOrdered operation. 362 */ 363 364 private final PipelineHelper<T> helper; 365 private Spliterator<S> spliterator; 366 private final long targetSize; 367 private final ConcurrentHashMap<ForEachOrderedTask<S, T>, ForEachOrderedTask<S, T>> completionMap; 368 private final Sink<T> action; 369 private final ForEachOrderedTask<S, T> leftPredecessor; 370 private Node<T> node; 371 ForEachOrderedTask(PipelineHelper<T> helper, Spliterator<S> spliterator, Sink<T> action)372 protected ForEachOrderedTask(PipelineHelper<T> helper, 373 Spliterator<S> spliterator, 374 Sink<T> action) { 375 super(null); 376 this.helper = helper; 377 this.spliterator = spliterator; 378 this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize()); 379 // Size map to avoid concurrent re-sizes 380 this.completionMap = new ConcurrentHashMap<>(Math.max(16, AbstractTask.getLeafTarget() << 1)); 381 this.action = action; 382 this.leftPredecessor = null; 383 } 384 ForEachOrderedTask(ForEachOrderedTask<S, T> parent, Spliterator<S> spliterator, ForEachOrderedTask<S, T> leftPredecessor)385 ForEachOrderedTask(ForEachOrderedTask<S, T> parent, 386 Spliterator<S> spliterator, 387 ForEachOrderedTask<S, T> leftPredecessor) { 388 super(parent); 389 this.helper = parent.helper; 390 this.spliterator = spliterator; 391 this.targetSize = parent.targetSize; 392 this.completionMap = parent.completionMap; 393 this.action = parent.action; 394 this.leftPredecessor = leftPredecessor; 395 } 396 397 @Override compute()398 public final void compute() { 399 doCompute(this); 400 } 401 doCompute(ForEachOrderedTask<S, T> task)402 private static <S, T> void doCompute(ForEachOrderedTask<S, T> task) { 403 Spliterator<S> rightSplit = task.spliterator, leftSplit; 404 long sizeThreshold = task.targetSize; 405 boolean forkRight = false; 406 while (rightSplit.estimateSize() > sizeThreshold && 407 (leftSplit = rightSplit.trySplit()) != null) { 408 ForEachOrderedTask<S, T> leftChild = 409 new ForEachOrderedTask<>(task, leftSplit, task.leftPredecessor); 410 ForEachOrderedTask<S, T> rightChild = 411 new ForEachOrderedTask<>(task, rightSplit, leftChild); 412 413 // Fork the parent task 414 // Completion of the left and right children "happens-before" 415 // completion of the parent 416 task.addToPendingCount(1); 417 // Completion of the left child "happens-before" completion of 418 // the right child 419 rightChild.addToPendingCount(1); 420 task.completionMap.put(leftChild, rightChild); 421 422 // If task is not on the left spine 423 if (task.leftPredecessor != null) { 424 /* 425 * Completion of left-predecessor, or left subtree, 426 * "happens-before" completion of left-most leaf node of 427 * right subtree. 428 * The left child's pending count needs to be updated before 429 * it is associated in the completion map, otherwise the 430 * left child can complete prematurely and violate the 431 * "happens-before" constraint. 432 */ 433 leftChild.addToPendingCount(1); 434 // Update association of left-predecessor to left-most 435 // leaf node of right subtree 436 if (task.completionMap.replace(task.leftPredecessor, task, leftChild)) { 437 // If replaced, adjust the pending count of the parent 438 // to complete when its children complete 439 task.addToPendingCount(-1); 440 } else { 441 // Left-predecessor has already completed, parent's 442 // pending count is adjusted by left-predecessor; 443 // left child is ready to complete 444 leftChild.addToPendingCount(-1); 445 } 446 } 447 448 ForEachOrderedTask<S, T> taskToFork; 449 if (forkRight) { 450 forkRight = false; 451 rightSplit = leftSplit; 452 task = leftChild; 453 taskToFork = rightChild; 454 } 455 else { 456 forkRight = true; 457 task = rightChild; 458 taskToFork = leftChild; 459 } 460 taskToFork.fork(); 461 } 462 463 /* 464 * Task's pending count is either 0 or 1. If 1 then the completion 465 * map will contain a value that is task, and two calls to 466 * tryComplete are required for completion, one below and one 467 * triggered by the completion of task's left-predecessor in 468 * onCompletion. Therefore there is no data race within the if 469 * block. 470 */ 471 if (task.getPendingCount() > 0) { 472 // Cannot complete just yet so buffer elements into a Node 473 // for use when completion occurs 474 @SuppressWarnings("unchecked") 475 IntFunction<T[]> generator = size -> (T[]) new Object[size]; 476 Node.Builder<T> nb = task.helper.makeNodeBuilder( 477 task.helper.exactOutputSizeIfKnown(rightSplit), 478 generator); 479 task.node = task.helper.wrapAndCopyInto(nb, rightSplit).build(); 480 task.spliterator = null; 481 } 482 task.tryComplete(); 483 } 484 485 @Override onCompletion(CountedCompleter<?> caller)486 public void onCompletion(CountedCompleter<?> caller) { 487 if (node != null) { 488 // Dump buffered elements from this leaf into the sink 489 node.forEach(action); 490 node = null; 491 } 492 else if (spliterator != null) { 493 // Dump elements output from this leaf's pipeline into the sink 494 helper.wrapAndCopyInto(action, spliterator); 495 spliterator = null; 496 } 497 498 // The completion of this task *and* the dumping of elements 499 // "happens-before" completion of the associated left-most leaf task 500 // of right subtree (if any, which can be this task's right sibling) 501 // 502 ForEachOrderedTask<S, T> leftDescendant = completionMap.remove(this); 503 if (leftDescendant != null) 504 leftDescendant.tryComplete(); 505 } 506 } 507 } 508