1 /* 2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Objects; 28 import java.util.Spliterator; 29 import java.util.concurrent.ConcurrentHashMap; 30 import java.util.concurrent.CountedCompleter; 31 import java.util.concurrent.ForkJoinTask; 32 import java.util.function.Consumer; 33 import java.util.function.DoubleConsumer; 34 import java.util.function.IntConsumer; 35 import java.util.function.IntFunction; 36 import java.util.function.LongConsumer; 37 38 /** 39 * Factory for creating instances of {@code TerminalOp} that perform an 40 * action for every element of a stream. Supported variants include unordered 41 * traversal (elements are provided to the {@code Consumer} as soon as they are 42 * available), and ordered traversal (elements are provided to the 43 * {@code Consumer} in encounter order.) 44 * 45 * <p>Elements are provided to the {@code Consumer} on whatever thread and 46 * whatever order they become available. For ordered traversals, it is 47 * guaranteed that processing an element <em>happens-before</em> processing 48 * subsequent elements in the encounter order. 49 * 50 * <p>Exceptions occurring as a result of sending an element to the 51 * {@code Consumer} will be relayed to the caller and traversal will be 52 * prematurely terminated. 53 * 54 * @since 1.8 55 */ 56 final class ForEachOps { 57 ForEachOps()58 private ForEachOps() { } 59 60 /** 61 * Constructs a {@code TerminalOp} that perform an action for every element 62 * of a stream. 63 * 64 * @param action the {@code Consumer} that receives all elements of a 65 * stream 66 * @param ordered whether an ordered traversal is requested 67 * @param <T> the type of the stream elements 68 * @return the {@code TerminalOp} instance 69 */ makeRef(Consumer<? super T> action, boolean ordered)70 public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action, 71 boolean ordered) { 72 Objects.requireNonNull(action); 73 return new ForEachOp.OfRef<>(action, ordered); 74 } 75 76 /** 77 * Constructs a {@code TerminalOp} that perform an action for every element 78 * of an {@code IntStream}. 79 * 80 * @param action the {@code IntConsumer} that receives all elements of a 81 * stream 82 * @param ordered whether an ordered traversal is requested 83 * @return the {@code TerminalOp} instance 84 */ makeInt(IntConsumer action, boolean ordered)85 public static TerminalOp<Integer, Void> makeInt(IntConsumer action, 86 boolean ordered) { 87 Objects.requireNonNull(action); 88 return new ForEachOp.OfInt(action, ordered); 89 } 90 91 /** 92 * Constructs a {@code TerminalOp} that perform an action for every element 93 * of a {@code LongStream}. 94 * 95 * @param action the {@code LongConsumer} that receives all elements of a 96 * stream 97 * @param ordered whether an ordered traversal is requested 98 * @return the {@code TerminalOp} instance 99 */ makeLong(LongConsumer action, boolean ordered)100 public static TerminalOp<Long, Void> makeLong(LongConsumer action, 101 boolean ordered) { 102 Objects.requireNonNull(action); 103 return new ForEachOp.OfLong(action, ordered); 104 } 105 106 /** 107 * Constructs a {@code TerminalOp} that perform an action for every element 108 * of a {@code DoubleStream}. 109 * 110 * @param action the {@code DoubleConsumer} that receives all elements of 111 * a stream 112 * @param ordered whether an ordered traversal is requested 113 * @return the {@code TerminalOp} instance 114 */ makeDouble(DoubleConsumer action, boolean ordered)115 public static TerminalOp<Double, Void> makeDouble(DoubleConsumer action, 116 boolean ordered) { 117 Objects.requireNonNull(action); 118 return new ForEachOp.OfDouble(action, ordered); 119 } 120 121 /** 122 * A {@code TerminalOp} that evaluates a stream pipeline and sends the 123 * output to itself as a {@code TerminalSink}. Elements will be sent in 124 * whatever thread they become available. If the traversal is unordered, 125 * they will be sent independent of the stream's encounter order. 126 * 127 * <p>This terminal operation is stateless. For parallel evaluation, each 128 * leaf instance of a {@code ForEachTask} will send elements to the same 129 * {@code TerminalSink} reference that is an instance of this class. 130 * 131 * @param <T> the output type of the stream pipeline 132 */ 133 static abstract class ForEachOp<T> 134 implements TerminalOp<T, Void>, TerminalSink<T, Void> { 135 private final boolean ordered; 136 ForEachOp(boolean ordered)137 protected ForEachOp(boolean ordered) { 138 this.ordered = ordered; 139 } 140 141 // TerminalOp 142 143 @Override getOpFlags()144 public int getOpFlags() { 145 return ordered ? 0 : StreamOpFlag.NOT_ORDERED; 146 } 147 148 @Override evaluateSequential(PipelineHelper<T> helper, Spliterator<S> spliterator)149 public <S> Void evaluateSequential(PipelineHelper<T> helper, 150 Spliterator<S> spliterator) { 151 return helper.wrapAndCopyInto(this, spliterator).get(); 152 } 153 154 @Override evaluateParallel(PipelineHelper<T> helper, Spliterator<S> spliterator)155 public <S> Void evaluateParallel(PipelineHelper<T> helper, 156 Spliterator<S> spliterator) { 157 if (ordered) 158 new ForEachOrderedTask<>(helper, spliterator, this).invoke(); 159 else 160 new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke(); 161 return null; 162 } 163 164 // TerminalSink 165 166 @Override get()167 public Void get() { 168 return null; 169 } 170 171 // Implementations 172 173 /** Implementation class for reference streams */ 174 static final class OfRef<T> extends ForEachOp<T> { 175 final Consumer<? super T> consumer; 176 OfRef(Consumer<? super T> consumer, boolean ordered)177 OfRef(Consumer<? super T> consumer, boolean ordered) { 178 super(ordered); 179 this.consumer = consumer; 180 } 181 182 @Override accept(T t)183 public void accept(T t) { 184 consumer.accept(t); 185 } 186 } 187 188 /** Implementation class for {@code IntStream} */ 189 static final class OfInt extends ForEachOp<Integer> 190 implements Sink.OfInt { 191 final IntConsumer consumer; 192 OfInt(IntConsumer consumer, boolean ordered)193 OfInt(IntConsumer consumer, boolean ordered) { 194 super(ordered); 195 this.consumer = consumer; 196 } 197 198 @Override inputShape()199 public StreamShape inputShape() { 200 return StreamShape.INT_VALUE; 201 } 202 203 @Override accept(int t)204 public void accept(int t) { 205 consumer.accept(t); 206 } 207 } 208 209 /** Implementation class for {@code LongStream} */ 210 static final class OfLong extends ForEachOp<Long> 211 implements Sink.OfLong { 212 final LongConsumer consumer; 213 OfLong(LongConsumer consumer, boolean ordered)214 OfLong(LongConsumer consumer, boolean ordered) { 215 super(ordered); 216 this.consumer = consumer; 217 } 218 219 @Override inputShape()220 public StreamShape inputShape() { 221 return StreamShape.LONG_VALUE; 222 } 223 224 @Override accept(long t)225 public void accept(long t) { 226 consumer.accept(t); 227 } 228 } 229 230 /** Implementation class for {@code DoubleStream} */ 231 static final class OfDouble extends ForEachOp<Double> 232 implements Sink.OfDouble { 233 final DoubleConsumer consumer; 234 OfDouble(DoubleConsumer consumer, boolean ordered)235 OfDouble(DoubleConsumer consumer, boolean ordered) { 236 super(ordered); 237 this.consumer = consumer; 238 } 239 240 @Override inputShape()241 public StreamShape inputShape() { 242 return StreamShape.DOUBLE_VALUE; 243 } 244 245 @Override accept(double t)246 public void accept(double t) { 247 consumer.accept(t); 248 } 249 } 250 } 251 252 /** A {@code ForkJoinTask} for performing a parallel for-each operation */ 253 @SuppressWarnings("serial") 254 static final class ForEachTask<S, T> extends CountedCompleter<Void> { 255 private Spliterator<S> spliterator; 256 private final Sink<S> sink; 257 private final PipelineHelper<T> helper; 258 private long targetSize; 259 ForEachTask(PipelineHelper<T> helper, Spliterator<S> spliterator, Sink<S> sink)260 ForEachTask(PipelineHelper<T> helper, 261 Spliterator<S> spliterator, 262 Sink<S> sink) { 263 super(null); 264 this.sink = sink; 265 this.helper = helper; 266 this.spliterator = spliterator; 267 this.targetSize = 0L; 268 } 269 ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator)270 ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator) { 271 super(parent); 272 this.spliterator = spliterator; 273 this.sink = parent.sink; 274 this.targetSize = parent.targetSize; 275 this.helper = parent.helper; 276 } 277 278 // Similar to AbstractTask but doesn't need to track child tasks compute()279 public void compute() { 280 Spliterator<S> rightSplit = spliterator, leftSplit; 281 long sizeEstimate = rightSplit.estimateSize(), sizeThreshold; 282 if ((sizeThreshold = targetSize) == 0L) 283 targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate); 284 boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags()); 285 boolean forkRight = false; 286 Sink<S> taskSink = sink; 287 ForEachTask<S, T> task = this; 288 while (!isShortCircuit || !taskSink.cancellationRequested()) { 289 if (sizeEstimate <= sizeThreshold || 290 (leftSplit = rightSplit.trySplit()) == null) { 291 task.helper.copyInto(taskSink, rightSplit); 292 break; 293 } 294 ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit); 295 task.addToPendingCount(1); 296 ForEachTask<S, T> taskToFork; 297 if (forkRight) { 298 forkRight = false; 299 rightSplit = leftSplit; 300 taskToFork = task; 301 task = leftTask; 302 } 303 else { 304 forkRight = true; 305 taskToFork = leftTask; 306 } 307 taskToFork.fork(); 308 sizeEstimate = rightSplit.estimateSize(); 309 } 310 task.spliterator = null; 311 task.propagateCompletion(); 312 } 313 } 314 315 /** 316 * A {@code ForkJoinTask} for performing a parallel for-each operation 317 * which visits the elements in encounter order 318 */ 319 @SuppressWarnings("serial") 320 static final class ForEachOrderedTask<S, T> extends CountedCompleter<Void> { 321 /* 322 * Our goal is to ensure that the elements associated with a task are 323 * processed according to an in-order traversal of the computation tree. 324 * We use completion counts for representing these dependencies, so that 325 * a task does not complete until all the tasks preceding it in this 326 * order complete. We use the "completion map" to associate the next 327 * task in this order for any left child. We increase the pending count 328 * of any node on the right side of such a mapping by one to indicate 329 * its dependency, and when a node on the left side of such a mapping 330 * completes, it decrements the pending count of its corresponding right 331 * side. As the computation tree is expanded by splitting, we must 332 * atomically update the mappings to maintain the invariant that the 333 * completion map maps left children to the next node in the in-order 334 * traversal. 335 * 336 * Take, for example, the following computation tree of tasks: 337 * 338 * a 339 * / \ 340 * b c 341 * / \ / \ 342 * d e f g 343 * 344 * The complete map will contain (not necessarily all at the same time) 345 * the following associations: 346 * 347 * d -> e 348 * b -> f 349 * f -> g 350 * 351 * Tasks e, f, g will have their pending counts increased by 1. 352 * 353 * The following relationships hold: 354 * 355 * - completion of d "happens-before" e; 356 * - completion of d and e "happens-before b; 357 * - completion of b "happens-before" f; and 358 * - completion of f "happens-before" g 359 * 360 * Thus overall the "happens-before" relationship holds for the 361 * reporting of elements, covered by tasks d, e, f and g, as specified 362 * by the forEachOrdered operation. 363 */ 364 365 private final PipelineHelper<T> helper; 366 private Spliterator<S> spliterator; 367 private final long targetSize; 368 private final ConcurrentHashMap<ForEachOrderedTask<S, T>, ForEachOrderedTask<S, T>> completionMap; 369 private final Sink<T> action; 370 private final ForEachOrderedTask<S, T> leftPredecessor; 371 private Node<T> node; 372 ForEachOrderedTask(PipelineHelper<T> helper, Spliterator<S> spliterator, Sink<T> action)373 protected ForEachOrderedTask(PipelineHelper<T> helper, 374 Spliterator<S> spliterator, 375 Sink<T> action) { 376 super(null); 377 this.helper = helper; 378 this.spliterator = spliterator; 379 this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize()); 380 // Size map to avoid concurrent re-sizes 381 this.completionMap = new ConcurrentHashMap<>(Math.max(16, AbstractTask.LEAF_TARGET << 1)); 382 this.action = action; 383 this.leftPredecessor = null; 384 } 385 ForEachOrderedTask(ForEachOrderedTask<S, T> parent, Spliterator<S> spliterator, ForEachOrderedTask<S, T> leftPredecessor)386 ForEachOrderedTask(ForEachOrderedTask<S, T> parent, 387 Spliterator<S> spliterator, 388 ForEachOrderedTask<S, T> leftPredecessor) { 389 super(parent); 390 this.helper = parent.helper; 391 this.spliterator = spliterator; 392 this.targetSize = parent.targetSize; 393 this.completionMap = parent.completionMap; 394 this.action = parent.action; 395 this.leftPredecessor = leftPredecessor; 396 } 397 398 @Override compute()399 public final void compute() { 400 doCompute(this); 401 } 402 doCompute(ForEachOrderedTask<S, T> task)403 private static <S, T> void doCompute(ForEachOrderedTask<S, T> task) { 404 Spliterator<S> rightSplit = task.spliterator, leftSplit; 405 long sizeThreshold = task.targetSize; 406 boolean forkRight = false; 407 while (rightSplit.estimateSize() > sizeThreshold && 408 (leftSplit = rightSplit.trySplit()) != null) { 409 ForEachOrderedTask<S, T> leftChild = 410 new ForEachOrderedTask<>(task, leftSplit, task.leftPredecessor); 411 ForEachOrderedTask<S, T> rightChild = 412 new ForEachOrderedTask<>(task, rightSplit, leftChild); 413 414 // Fork the parent task 415 // Completion of the left and right children "happens-before" 416 // completion of the parent 417 task.addToPendingCount(1); 418 // Completion of the left child "happens-before" completion of 419 // the right child 420 rightChild.addToPendingCount(1); 421 task.completionMap.put(leftChild, rightChild); 422 423 // If task is not on the left spine 424 if (task.leftPredecessor != null) { 425 /* 426 * Completion of left-predecessor, or left subtree, 427 * "happens-before" completion of left-most leaf node of 428 * right subtree. 429 * The left child's pending count needs to be updated before 430 * it is associated in the completion map, otherwise the 431 * left child can complete prematurely and violate the 432 * "happens-before" constraint. 433 */ 434 leftChild.addToPendingCount(1); 435 // Update association of left-predecessor to left-most 436 // leaf node of right subtree 437 if (task.completionMap.replace(task.leftPredecessor, task, leftChild)) { 438 // If replaced, adjust the pending count of the parent 439 // to complete when its children complete 440 task.addToPendingCount(-1); 441 } else { 442 // Left-predecessor has already completed, parent's 443 // pending count is adjusted by left-predecessor; 444 // left child is ready to complete 445 leftChild.addToPendingCount(-1); 446 } 447 } 448 449 ForEachOrderedTask<S, T> taskToFork; 450 if (forkRight) { 451 forkRight = false; 452 rightSplit = leftSplit; 453 task = leftChild; 454 taskToFork = rightChild; 455 } 456 else { 457 forkRight = true; 458 task = rightChild; 459 taskToFork = leftChild; 460 } 461 taskToFork.fork(); 462 } 463 464 /* 465 * Task's pending count is either 0 or 1. If 1 then the completion 466 * map will contain a value that is task, and two calls to 467 * tryComplete are required for completion, one below and one 468 * triggered by the completion of task's left-predecessor in 469 * onCompletion. Therefore there is no data race within the if 470 * block. 471 */ 472 if (task.getPendingCount() > 0) { 473 // Cannot complete just yet so buffer elements into a Node 474 // for use when completion occurs 475 @SuppressWarnings("unchecked") 476 IntFunction<T[]> generator = size -> (T[]) new Object[size]; 477 Node.Builder<T> nb = task.helper.makeNodeBuilder( 478 task.helper.exactOutputSizeIfKnown(rightSplit), 479 generator); 480 task.node = task.helper.wrapAndCopyInto(nb, rightSplit).build(); 481 task.spliterator = null; 482 } 483 task.tryComplete(); 484 } 485 486 @Override onCompletion(CountedCompleter<?> caller)487 public void onCompletion(CountedCompleter<?> caller) { 488 if (node != null) { 489 // Dump buffered elements from this leaf into the sink 490 node.forEach(action); 491 node = null; 492 } 493 else if (spliterator != null) { 494 // Dump elements output from this leaf's pipeline into the sink 495 helper.wrapAndCopyInto(action, spliterator); 496 spliterator = null; 497 } 498 499 // The completion of this task *and* the dumping of elements 500 // "happens-before" completion of the associated left-most leaf task 501 // of right subtree (if any, which can be this task's right sibling) 502 // 503 ForEachOrderedTask<S, T> leftDescendant = completionMap.remove(this); 504 if (leftDescendant != null) 505 leftDescendant.tryComplete(); 506 } 507 } 508 } 509