1 /*
2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 package java.util.stream;
26 
27 import java.util.Objects;
28 import java.util.function.Consumer;
29 import java.util.function.DoubleConsumer;
30 import java.util.function.IntConsumer;
31 import java.util.function.LongConsumer;
32 
33 /**
34  * An extension of {@link Consumer} used to conduct values through the stages of
35  * a stream pipeline, with additional methods to manage size information,
36  * control flow, etc.  Before calling the {@code accept()} method on a
37  * {@code Sink} for the first time, you must first call the {@code begin()}
38  * method to inform it that data is coming (optionally informing the sink how
39  * much data is coming), and after all data has been sent, you must call the
40  * {@code end()} method.  After calling {@code end()}, you should not call
41  * {@code accept()} without again calling {@code begin()}.  {@code Sink} also
42  * offers a mechanism by which the sink can cooperatively signal that it does
43  * not wish to receive any more data (the {@code cancellationRequested()}
44  * method), which a source can poll before sending more data to the
45  * {@code Sink}.
46  *
47  * <p>A sink may be in one of two states: an initial state and an active state.
48  * It starts out in the initial state; the {@code begin()} method transitions
49  * it to the active state, and the {@code end()} method transitions it back into
50  * the initial state, where it can be re-used.  Data-accepting methods (such as
51  * {@code accept()} are only valid in the active state.
52  *
53  * @apiNote
54  * A stream pipeline consists of a source, zero or more intermediate stages
55  * (such as filtering or mapping), and a terminal stage, such as reduction or
56  * for-each.  For concreteness, consider the pipeline:
57  *
58  * <pre>{@code
59  *     int longestStringLengthStartingWithA
60  *         = strings.stream()
61  *                  .filter(s -> s.startsWith("A"))
62  *                  .mapToInt(String::length)
63  *                  .max();
64  * }</pre>
65  *
66  * <p>Here, we have three stages, filtering, mapping, and reducing.  The
67  * filtering stage consumes strings and emits a subset of those strings; the
68  * mapping stage consumes strings and emits ints; the reduction stage consumes
69  * those ints and computes the maximal value.
70  *
71  * <p>A {@code Sink} instance is used to represent each stage of this pipeline,
72  * whether the stage accepts objects, ints, longs, or doubles.  Sink has entry
73  * points for {@code accept(Object)}, {@code accept(int)}, etc, so that we do
74  * not need a specialized interface for each primitive specialization.  (It
75  * might be called a "kitchen sink" for this omnivorous tendency.)  The entry
76  * point to the pipeline is the {@code Sink} for the filtering stage, which
77  * sends some elements "downstream" -- into the {@code Sink} for the mapping
78  * stage, which in turn sends integral values downstream into the {@code Sink}
79  * for the reduction stage. The {@code Sink} implementations associated with a
80  * given stage is expected to know the data type for the next stage, and call
81  * the correct {@code accept} method on its downstream {@code Sink}.  Similarly,
82  * each stage must implement the correct {@code accept} method corresponding to
83  * the data type it accepts.
84  *
85  * <p>The specialized subtypes such as {@link Sink.OfInt} override
86  * {@code accept(Object)} to call the appropriate primitive specialization of
87  * {@code accept}, implement the appropriate primitive specialization of
88  * {@code Consumer}, and re-abstract the appropriate primitive specialization of
89  * {@code accept}.
90  *
91  * <p>The chaining subtypes such as {@link ChainedInt} not only implement
92  * {@code Sink.OfInt}, but also maintain a {@code downstream} field which
93  * represents the downstream {@code Sink}, and implement the methods
94  * {@code begin()}, {@code end()}, and {@code cancellationRequested()} to
95  * delegate to the downstream {@code Sink}.  Most implementations of
96  * intermediate operations will use these chaining wrappers.  For example, the
97  * mapping stage in the above example would look like:
98  *
99  * <pre>{@code
100  *     IntSink is = new Sink.ChainedReference<U>(sink) {
101  *         public void accept(U u) {
102  *             downstream.accept(mapper.applyAsInt(u));
103  *         }
104  *     };
105  * }</pre>
106  *
107  * <p>Here, we implement {@code Sink.ChainedReference<U>}, meaning that we expect
108  * to receive elements of type {@code U} as input, and pass the downstream sink
109  * to the constructor.  Because the next stage expects to receive integers, we
110  * must call the {@code accept(int)} method when emitting values to the downstream.
111  * The {@code accept()} method applies the mapping function from {@code U} to
112  * {@code int} and passes the resulting value to the downstream {@code Sink}.
113  *
114  * @param <T> type of elements for value streams
115  * @since 1.8
116  * @hide Visible for CTS testing only (OpenJDK8 tests).
117  */
118 public interface Sink<T> extends Consumer<T> {
119     /**
120      * Resets the sink state to receive a fresh data set.  This must be called
121      * before sending any data to the sink.  After calling {@link #end()},
122      * you may call this method to reset the sink for another calculation.
123      * @param size The exact size of the data to be pushed downstream, if
124      * known or {@code -1} if unknown or infinite.
125      *
126      * <p>Prior to this call, the sink must be in the initial state, and after
127      * this call it is in the active state.
128      */
begin(long size)129     default void begin(long size) {}
130 
131     /**
132      * Indicates that all elements have been pushed.  If the {@code Sink} is
133      * stateful, it should send any stored state downstream at this time, and
134      * should clear any accumulated state (and associated resources).
135      *
136      * <p>Prior to this call, the sink must be in the active state, and after
137      * this call it is returned to the initial state.
138      */
end()139     default void end() {}
140 
141     /**
142      * Indicates that this {@code Sink} does not wish to receive any more data.
143      *
144      * @implSpec The default implementation always returns false.
145      *
146      * @return true if cancellation is requested
147      */
cancellationRequested()148     default boolean cancellationRequested() {
149         return false;
150     }
151 
152     /**
153      * Accepts an int value.
154      *
155      * @implSpec The default implementation throws IllegalStateException.
156      *
157      * @throws IllegalStateException if this sink does not accept int values
158      */
accept(int value)159     default void accept(int value) {
160         throw new IllegalStateException("called wrong accept method");
161     }
162 
163     /**
164      * Accepts a long value.
165      *
166      * @implSpec The default implementation throws IllegalStateException.
167      *
168      * @throws IllegalStateException if this sink does not accept long values
169      */
accept(long value)170     default void accept(long value) {
171         throw new IllegalStateException("called wrong accept method");
172     }
173 
174     /**
175      * Accepts a double value.
176      *
177      * @implSpec The default implementation throws IllegalStateException.
178      *
179      * @throws IllegalStateException if this sink does not accept double values
180      */
accept(double value)181     default void accept(double value) {
182         throw new IllegalStateException("called wrong accept method");
183     }
184 
185     /**
186      * {@code Sink} that implements {@code Sink<Integer>}, re-abstracts
187      * {@code accept(int)}, and wires {@code accept(Integer)} to bridge to
188      * {@code accept(int)}.
189      */
190     interface OfInt extends Sink<Integer>, IntConsumer {
191         @Override
accept(int value)192         void accept(int value);
193 
194         @Override
accept(Integer i)195         default void accept(Integer i) {
196             if (Tripwire.ENABLED)
197                 Tripwire.trip(getClass(), "{0} calling Sink.OfInt.accept(Integer)");
198             accept(i.intValue());
199         }
200     }
201 
202     /**
203      * {@code Sink} that implements {@code Sink<Long>}, re-abstracts
204      * {@code accept(long)}, and wires {@code accept(Long)} to bridge to
205      * {@code accept(long)}.
206      */
207     interface OfLong extends Sink<Long>, LongConsumer {
208         @Override
accept(long value)209         void accept(long value);
210 
211         @Override
accept(Long i)212         default void accept(Long i) {
213             if (Tripwire.ENABLED)
214                 Tripwire.trip(getClass(), "{0} calling Sink.OfLong.accept(Long)");
215             accept(i.longValue());
216         }
217     }
218 
219     /**
220      * {@code Sink} that implements {@code Sink<Double>}, re-abstracts
221      * {@code accept(double)}, and wires {@code accept(Double)} to bridge to
222      * {@code accept(double)}.
223      */
224     interface OfDouble extends Sink<Double>, DoubleConsumer {
225         @Override
accept(double value)226         void accept(double value);
227 
228         @Override
accept(Double i)229         default void accept(Double i) {
230             if (Tripwire.ENABLED)
231                 Tripwire.trip(getClass(), "{0} calling Sink.OfDouble.accept(Double)");
232             accept(i.doubleValue());
233         }
234     }
235 
236     /**
237      * Abstract {@code Sink} implementation for creating chains of
238      * sinks.  The {@code begin}, {@code end}, and
239      * {@code cancellationRequested} methods are wired to chain to the
240      * downstream {@code Sink}.  This implementation takes a downstream
241      * {@code Sink} of unknown input shape and produces a {@code Sink<T>}.  The
242      * implementation of the {@code accept()} method must call the correct
243      * {@code accept()} method on the downstream {@code Sink}.
244      */
245     static abstract class ChainedReference<T, E_OUT> implements Sink<T> {
246         protected final Sink<? super E_OUT> downstream;
247 
ChainedReference(Sink<? super E_OUT> downstream)248         public ChainedReference(Sink<? super E_OUT> downstream) {
249             this.downstream = Objects.requireNonNull(downstream);
250         }
251 
252         @Override
begin(long size)253         public void begin(long size) {
254             downstream.begin(size);
255         }
256 
257         @Override
end()258         public void end() {
259             downstream.end();
260         }
261 
262         @Override
cancellationRequested()263         public boolean cancellationRequested() {
264             return downstream.cancellationRequested();
265         }
266     }
267 
268     /**
269      * Abstract {@code Sink} implementation designed for creating chains of
270      * sinks.  The {@code begin}, {@code end}, and
271      * {@code cancellationRequested} methods are wired to chain to the
272      * downstream {@code Sink}.  This implementation takes a downstream
273      * {@code Sink} of unknown input shape and produces a {@code Sink.OfInt}.
274      * The implementation of the {@code accept()} method must call the correct
275      * {@code accept()} method on the downstream {@code Sink}.
276      */
277     static abstract class ChainedInt<E_OUT> implements Sink.OfInt {
278         protected final Sink<? super E_OUT> downstream;
279 
ChainedInt(Sink<? super E_OUT> downstream)280         public ChainedInt(Sink<? super E_OUT> downstream) {
281             this.downstream = Objects.requireNonNull(downstream);
282         }
283 
284         @Override
begin(long size)285         public void begin(long size) {
286             downstream.begin(size);
287         }
288 
289         @Override
end()290         public void end() {
291             downstream.end();
292         }
293 
294         @Override
cancellationRequested()295         public boolean cancellationRequested() {
296             return downstream.cancellationRequested();
297         }
298     }
299 
300     /**
301      * Abstract {@code Sink} implementation designed for creating chains of
302      * sinks.  The {@code begin}, {@code end}, and
303      * {@code cancellationRequested} methods are wired to chain to the
304      * downstream {@code Sink}.  This implementation takes a downstream
305      * {@code Sink} of unknown input shape and produces a {@code Sink.OfLong}.
306      * The implementation of the {@code accept()} method must call the correct
307      * {@code accept()} method on the downstream {@code Sink}.
308      */
309     static abstract class ChainedLong<E_OUT> implements Sink.OfLong {
310         protected final Sink<? super E_OUT> downstream;
311 
ChainedLong(Sink<? super E_OUT> downstream)312         public ChainedLong(Sink<? super E_OUT> downstream) {
313             this.downstream = Objects.requireNonNull(downstream);
314         }
315 
316         @Override
begin(long size)317         public void begin(long size) {
318             downstream.begin(size);
319         }
320 
321         @Override
end()322         public void end() {
323             downstream.end();
324         }
325 
326         @Override
cancellationRequested()327         public boolean cancellationRequested() {
328             return downstream.cancellationRequested();
329         }
330     }
331 
332     /**
333      * Abstract {@code Sink} implementation designed for creating chains of
334      * sinks.  The {@code begin}, {@code end}, and
335      * {@code cancellationRequested} methods are wired to chain to the
336      * downstream {@code Sink}.  This implementation takes a downstream
337      * {@code Sink} of unknown input shape and produces a {@code Sink.OfDouble}.
338      * The implementation of the {@code accept()} method must call the correct
339      * {@code accept()} method on the downstream {@code Sink}.
340      */
341     static abstract class ChainedDouble<E_OUT> implements Sink.OfDouble {
342         protected final Sink<? super E_OUT> downstream;
343 
ChainedDouble(Sink<? super E_OUT> downstream)344         public ChainedDouble(Sink<? super E_OUT> downstream) {
345             this.downstream = Objects.requireNonNull(downstream);
346         }
347 
348         @Override
begin(long size)349         public void begin(long size) {
350             downstream.begin(size);
351         }
352 
353         @Override
end()354         public void end() {
355             downstream.end();
356         }
357 
358         @Override
cancellationRequested()359         public boolean cancellationRequested() {
360             return downstream.cancellationRequested();
361         }
362     }
363 }
364