1 /*
2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 package java.util.stream;
26 
27 import java.util.Objects;
28 import java.util.function.Consumer;
29 import java.util.function.DoubleConsumer;
30 import java.util.function.IntConsumer;
31 import java.util.function.LongConsumer;
32 
33 /**
34  * An extension of {@link Consumer} used to conduct values through the stages of
35  * a stream pipeline, with additional methods to manage size information,
36  * control flow, etc.  Before calling the {@code accept()} method on a
37  * {@code Sink} for the first time, you must first call the {@code begin()}
38  * method to inform it that data is coming (optionally informing the sink how
39  * much data is coming), and after all data has been sent, you must call the
40  * {@code end()} method.  After calling {@code end()}, you should not call
41  * {@code accept()} without again calling {@code begin()}.  {@code Sink} also
42  * offers a mechanism by which the sink can cooperatively signal that it does
43  * not wish to receive any more data (the {@code cancellationRequested()}
44  * method), which a source can poll before sending more data to the
45  * {@code Sink}.
46  *
47  * <p>A sink may be in one of two states: an initial state and an active state.
48  * It starts out in the initial state; the {@code begin()} method transitions
49  * it to the active state, and the {@code end()} method transitions it back into
50  * the initial state, where it can be re-used.  Data-accepting methods (such as
51  * {@code accept()} are only valid in the active state.
52  *
53  * @apiNote
54  * A stream pipeline consists of a source, zero or more intermediate stages
55  * (such as filtering or mapping), and a terminal stage, such as reduction or
56  * for-each.  For concreteness, consider the pipeline:
57  *
58  * <pre>{@code
59  *     int longestStringLengthStartingWithA
60  *         = strings.stream()
61  *                  .filter(s -> s.startsWith("A"))
62  *                  .mapToInt(String::length)
63  *                  .max();
64  * }</pre>
65  *
66  * <p>Here, we have three stages, filtering, mapping, and reducing.  The
67  * filtering stage consumes strings and emits a subset of those strings; the
68  * mapping stage consumes strings and emits ints; the reduction stage consumes
69  * those ints and computes the maximal value.
70  *
71  * <p>A {@code Sink} instance is used to represent each stage of this pipeline,
72  * whether the stage accepts objects, ints, longs, or doubles.  Sink has entry
73  * points for {@code accept(Object)}, {@code accept(int)}, etc, so that we do
74  * not need a specialized interface for each primitive specialization.  (It
75  * might be called a "kitchen sink" for this omnivorous tendency.)  The entry
76  * point to the pipeline is the {@code Sink} for the filtering stage, which
77  * sends some elements "downstream" -- into the {@code Sink} for the mapping
78  * stage, which in turn sends integral values downstream into the {@code Sink}
79  * for the reduction stage. The {@code Sink} implementations associated with a
80  * given stage is expected to know the data type for the next stage, and call
81  * the correct {@code accept} method on its downstream {@code Sink}.  Similarly,
82  * each stage must implement the correct {@code accept} method corresponding to
83  * the data type it accepts.
84  *
85  * <p>The specialized subtypes such as {@link Sink.OfInt} override
86  * {@code accept(Object)} to call the appropriate primitive specialization of
87  * {@code accept}, implement the appropriate primitive specialization of
88  * {@code Consumer}, and re-abstract the appropriate primitive specialization of
89  * {@code accept}.
90  *
91  * <p>The chaining subtypes such as {@link ChainedInt} not only implement
92  * {@code Sink.OfInt}, but also maintain a {@code downstream} field which
93  * represents the downstream {@code Sink}, and implement the methods
94  * {@code begin()}, {@code end()}, and {@code cancellationRequested()} to
95  * delegate to the downstream {@code Sink}.  Most implementations of
96  * intermediate operations will use these chaining wrappers.  For example, the
97  * mapping stage in the above example would look like:
98  *
99  * <pre>{@code
100  *     IntSink is = new Sink.ChainedReference<U>(sink) {
101  *         public void accept(U u) {
102  *             downstream.accept(mapper.applyAsInt(u));
103  *         }
104  *     };
105  * }</pre>
106  *
107  * <p>Here, we implement {@code Sink.ChainedReference<U>}, meaning that we expect
108  * to receive elements of type {@code U} as input, and pass the downstream sink
109  * to the constructor.  Because the next stage expects to receive integers, we
110  * must call the {@code accept(int)} method when emitting values to the downstream.
111  * The {@code accept()} method applies the mapping function from {@code U} to
112  * {@code int} and passes the resulting value to the downstream {@code Sink}.
113  *
114  * @param <T> type of elements for value streams
115  * @since 1.8
116  * @hide Visible for CTS testing only (OpenJDK8 tests).
117  */
118 // Android-changed: Made public for CTS tests only.
119 public interface Sink<T> extends Consumer<T> {
120     /**
121      * Resets the sink state to receive a fresh data set.  This must be called
122      * before sending any data to the sink.  After calling {@link #end()},
123      * you may call this method to reset the sink for another calculation.
124      * @param size The exact size of the data to be pushed downstream, if
125      * known or {@code -1} if unknown or infinite.
126      *
127      * <p>Prior to this call, the sink must be in the initial state, and after
128      * this call it is in the active state.
129      */
begin(long size)130     default void begin(long size) {}
131 
132     /**
133      * Indicates that all elements have been pushed.  If the {@code Sink} is
134      * stateful, it should send any stored state downstream at this time, and
135      * should clear any accumulated state (and associated resources).
136      *
137      * <p>Prior to this call, the sink must be in the active state, and after
138      * this call it is returned to the initial state.
139      */
end()140     default void end() {}
141 
142     /**
143      * Indicates that this {@code Sink} does not wish to receive any more data.
144      *
145      * @implSpec The default implementation always returns false.
146      *
147      * @return true if cancellation is requested
148      */
cancellationRequested()149     default boolean cancellationRequested() {
150         return false;
151     }
152 
153     /**
154      * Accepts an int value.
155      *
156      * @implSpec The default implementation throws IllegalStateException.
157      *
158      * @throws IllegalStateException if this sink does not accept int values
159      */
accept(int value)160     default void accept(int value) {
161         throw new IllegalStateException("called wrong accept method");
162     }
163 
164     /**
165      * Accepts a long value.
166      *
167      * @implSpec The default implementation throws IllegalStateException.
168      *
169      * @throws IllegalStateException if this sink does not accept long values
170      */
accept(long value)171     default void accept(long value) {
172         throw new IllegalStateException("called wrong accept method");
173     }
174 
175     /**
176      * Accepts a double value.
177      *
178      * @implSpec The default implementation throws IllegalStateException.
179      *
180      * @throws IllegalStateException if this sink does not accept double values
181      */
accept(double value)182     default void accept(double value) {
183         throw new IllegalStateException("called wrong accept method");
184     }
185 
186     /**
187      * {@code Sink} that implements {@code Sink<Integer>}, re-abstracts
188      * {@code accept(int)}, and wires {@code accept(Integer)} to bridge to
189      * {@code accept(int)}.
190      */
191     interface OfInt extends Sink<Integer>, IntConsumer {
192         @Override
accept(int value)193         void accept(int value);
194 
195         @Override
accept(Integer i)196         default void accept(Integer i) {
197             if (Tripwire.ENABLED)
198                 Tripwire.trip(getClass(), "{0} calling Sink.OfInt.accept(Integer)");
199             accept(i.intValue());
200         }
201     }
202 
203     /**
204      * {@code Sink} that implements {@code Sink<Long>}, re-abstracts
205      * {@code accept(long)}, and wires {@code accept(Long)} to bridge to
206      * {@code accept(long)}.
207      */
208     interface OfLong extends Sink<Long>, LongConsumer {
209         @Override
accept(long value)210         void accept(long value);
211 
212         @Override
accept(Long i)213         default void accept(Long i) {
214             if (Tripwire.ENABLED)
215                 Tripwire.trip(getClass(), "{0} calling Sink.OfLong.accept(Long)");
216             accept(i.longValue());
217         }
218     }
219 
220     /**
221      * {@code Sink} that implements {@code Sink<Double>}, re-abstracts
222      * {@code accept(double)}, and wires {@code accept(Double)} to bridge to
223      * {@code accept(double)}.
224      */
225     interface OfDouble extends Sink<Double>, DoubleConsumer {
226         @Override
accept(double value)227         void accept(double value);
228 
229         @Override
accept(Double i)230         default void accept(Double i) {
231             if (Tripwire.ENABLED)
232                 Tripwire.trip(getClass(), "{0} calling Sink.OfDouble.accept(Double)");
233             accept(i.doubleValue());
234         }
235     }
236 
237     /**
238      * Abstract {@code Sink} implementation for creating chains of
239      * sinks.  The {@code begin}, {@code end}, and
240      * {@code cancellationRequested} methods are wired to chain to the
241      * downstream {@code Sink}.  This implementation takes a downstream
242      * {@code Sink} of unknown input shape and produces a {@code Sink<T>}.  The
243      * implementation of the {@code accept()} method must call the correct
244      * {@code accept()} method on the downstream {@code Sink}.
245      */
246     abstract static class ChainedReference<T, E_OUT> implements Sink<T> {
247         protected final Sink<? super E_OUT> downstream;
248 
ChainedReference(Sink<? super E_OUT> downstream)249         public ChainedReference(Sink<? super E_OUT> downstream) {
250             this.downstream = Objects.requireNonNull(downstream);
251         }
252 
253         @Override
begin(long size)254         public void begin(long size) {
255             downstream.begin(size);
256         }
257 
258         @Override
end()259         public void end() {
260             downstream.end();
261         }
262 
263         @Override
cancellationRequested()264         public boolean cancellationRequested() {
265             return downstream.cancellationRequested();
266         }
267     }
268 
269     /**
270      * Abstract {@code Sink} implementation designed for creating chains of
271      * sinks.  The {@code begin}, {@code end}, and
272      * {@code cancellationRequested} methods are wired to chain to the
273      * downstream {@code Sink}.  This implementation takes a downstream
274      * {@code Sink} of unknown input shape and produces a {@code Sink.OfInt}.
275      * The implementation of the {@code accept()} method must call the correct
276      * {@code accept()} method on the downstream {@code Sink}.
277      */
278     abstract static class ChainedInt<E_OUT> implements Sink.OfInt {
279         protected final Sink<? super E_OUT> downstream;
280 
ChainedInt(Sink<? super E_OUT> downstream)281         public ChainedInt(Sink<? super E_OUT> downstream) {
282             this.downstream = Objects.requireNonNull(downstream);
283         }
284 
285         @Override
begin(long size)286         public void begin(long size) {
287             downstream.begin(size);
288         }
289 
290         @Override
end()291         public void end() {
292             downstream.end();
293         }
294 
295         @Override
cancellationRequested()296         public boolean cancellationRequested() {
297             return downstream.cancellationRequested();
298         }
299     }
300 
301     /**
302      * Abstract {@code Sink} implementation designed for creating chains of
303      * sinks.  The {@code begin}, {@code end}, and
304      * {@code cancellationRequested} methods are wired to chain to the
305      * downstream {@code Sink}.  This implementation takes a downstream
306      * {@code Sink} of unknown input shape and produces a {@code Sink.OfLong}.
307      * The implementation of the {@code accept()} method must call the correct
308      * {@code accept()} method on the downstream {@code Sink}.
309      */
310     abstract static class ChainedLong<E_OUT> implements Sink.OfLong {
311         protected final Sink<? super E_OUT> downstream;
312 
ChainedLong(Sink<? super E_OUT> downstream)313         public ChainedLong(Sink<? super E_OUT> downstream) {
314             this.downstream = Objects.requireNonNull(downstream);
315         }
316 
317         @Override
begin(long size)318         public void begin(long size) {
319             downstream.begin(size);
320         }
321 
322         @Override
end()323         public void end() {
324             downstream.end();
325         }
326 
327         @Override
cancellationRequested()328         public boolean cancellationRequested() {
329             return downstream.cancellationRequested();
330         }
331     }
332 
333     /**
334      * Abstract {@code Sink} implementation designed for creating chains of
335      * sinks.  The {@code begin}, {@code end}, and
336      * {@code cancellationRequested} methods are wired to chain to the
337      * downstream {@code Sink}.  This implementation takes a downstream
338      * {@code Sink} of unknown input shape and produces a {@code Sink.OfDouble}.
339      * The implementation of the {@code accept()} method must call the correct
340      * {@code accept()} method on the downstream {@code Sink}.
341      */
342     abstract static class ChainedDouble<E_OUT> implements Sink.OfDouble {
343         protected final Sink<? super E_OUT> downstream;
344 
ChainedDouble(Sink<? super E_OUT> downstream)345         public ChainedDouble(Sink<? super E_OUT> downstream) {
346             this.downstream = Objects.requireNonNull(downstream);
347         }
348 
349         @Override
begin(long size)350         public void begin(long size) {
351             downstream.begin(size);
352         }
353 
354         @Override
end()355         public void end() {
356             downstream.end();
357         }
358 
359         @Override
cancellationRequested()360         public boolean cancellationRequested() {
361             return downstream.cancellationRequested();
362         }
363     }
364 }
365