Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 26 additions & 22 deletions src/java.base/share/classes/java/util/stream/Gatherer.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
*/
package java.util.stream;

import org.jspecify.annotations.NullMarked;
import org.jspecify.annotations.Nullable;

import jdk.internal.vm.annotation.ForceInline;

import java.util.*;
Expand Down Expand Up @@ -196,7 +199,8 @@
* @param <R> the type of output elements from the gatherer operation
* @since 24
*/
public interface Gatherer<T, A, R> {
@NullMarked
public interface Gatherer<T extends @Nullable Object, A extends @Nullable Object, R extends @Nullable Object> {
/**
* A function that produces an instance of the intermediate state used for
* this gathering operation.
Expand Down Expand Up @@ -265,7 +269,7 @@ default BiConsumer<A, Downstream<? super R>> finisher() {
* @return returns a composed Gatherer which connects the output of this
* Gatherer as input that Gatherer
*/
default <RR> Gatherer<T, ?, RR> andThen(Gatherer<? super R, ?, ? extends RR> that) {
default <RR extends @Nullable Object> Gatherer<T, ?, RR> andThen(Gatherer<? super R, ?, ? extends RR> that) {
Objects.requireNonNull(that);
return Gatherers.Composite.of(this, that);
}
Expand All @@ -280,7 +284,7 @@ default BiConsumer<A, Downstream<? super R>> finisher() {
* @return the instance of the default initializer
* @param <A> the type of the state of the returned initializer
*/
static <A> Supplier<A> defaultInitializer() {
static <A> Supplier<@Nullable A> defaultInitializer() {
return Gatherers.Value.DEFAULT.initializer();
}

Expand All @@ -295,7 +299,7 @@ static <A> Supplier<A> defaultInitializer() {
* @return the instance of the default combiner
* @param <A> the type of the state of the returned combiner
*/
static <A> BinaryOperator<A> defaultCombiner() {
static <A extends @Nullable Object> BinaryOperator<A> defaultCombiner() {
return Gatherers.Value.DEFAULT.combiner();
}

Expand All @@ -312,7 +316,7 @@ static <A> BinaryOperator<A> defaultCombiner() {
* @param <A> the type of the state of the returned finisher
* @param <R> the type of the Downstream of the returned finisher
*/
static <A, R> BiConsumer<A, Downstream<? super R>> defaultFinisher() {
static <A extends @Nullable Object, R extends @Nullable Object> BiConsumer<A, Downstream<? super R>> defaultFinisher() {
return Gatherers.Value.DEFAULT.finisher();
}

Expand All @@ -326,8 +330,8 @@ static <A, R> BiConsumer<A, Downstream<? super R>> defaultFinisher() {
* @throws NullPointerException if the argument is {@code null}
* @return the new {@code Gatherer}
*/
static <T, R> Gatherer<T, Void, R> ofSequential(
Integrator<Void, T, R> integrator) {
static <T extends @Nullable Object, R extends @Nullable Object> Gatherer<T, @Nullable Void, R> ofSequential(
Integrator<@Nullable Void, T, R> integrator) {
return of(
defaultInitializer(),
integrator,
Expand All @@ -347,9 +351,9 @@ static <T, R> Gatherer<T, Void, R> ofSequential(
* @throws NullPointerException if any argument is {@code null}
* @return the new {@code Gatherer}
*/
static <T, R> Gatherer<T, Void, R> ofSequential(
Integrator<Void, T, R> integrator,
BiConsumer<Void, Downstream<? super R>> finisher) {
static <T extends @Nullable Object, R extends @Nullable Object> Gatherer<T, @Nullable Void, R> ofSequential(
Integrator<@Nullable Void, T, R> integrator,
BiConsumer<@Nullable Void, Downstream<? super R>> finisher) {
return of(
defaultInitializer(),
integrator,
Expand All @@ -370,7 +374,7 @@ static <T, R> Gatherer<T, Void, R> ofSequential(
* @throws NullPointerException if any argument is {@code null}
* @return the new {@code Gatherer}
*/
static <T, A, R> Gatherer<T, A, R> ofSequential(
static <T extends @Nullable Object, A extends @Nullable Object, R extends @Nullable Object> Gatherer<T, A, R> ofSequential(
Supplier<A> initializer,
Integrator<A, T, R> integrator) {
return of(
Expand All @@ -394,7 +398,7 @@ static <T, A, R> Gatherer<T, A, R> ofSequential(
* @throws NullPointerException if any argument is {@code null}
* @return the new {@code Gatherer}
*/
static <T, A, R> Gatherer<T, A, R> ofSequential(
static <T extends @Nullable Object, A extends @Nullable Object, R extends @Nullable Object> Gatherer<T, A, R> ofSequential(
Supplier<A> initializer,
Integrator<A, T, R> integrator,
BiConsumer<A, Downstream<? super R>> finisher) {
Expand All @@ -416,7 +420,7 @@ static <T, A, R> Gatherer<T, A, R> ofSequential(
* @throws NullPointerException if any argument is {@code null}
* @return the new {@code Gatherer}
*/
static <T, R> Gatherer<T, Void, R> of(Integrator<Void, T, R> integrator) {
static <T extends @Nullable Object, R extends @Nullable Object> Gatherer<T, @Nullable Void, R> of(Integrator<@Nullable Void, T, R> integrator) {
return of(
defaultInitializer(),
integrator,
Expand All @@ -436,9 +440,9 @@ static <T, R> Gatherer<T, Void, R> of(Integrator<Void, T, R> integrator) {
* @throws NullPointerException if any argument is {@code null}
* @return the new {@code Gatherer}
*/
static <T, R> Gatherer<T, Void, R> of(
Integrator<Void, T, R> integrator,
BiConsumer<Void, Downstream<? super R>> finisher) {
static <T extends @Nullable Object, R extends @Nullable Object> Gatherer<T, @Nullable Void, R> of(
Integrator<@Nullable Void, T, R> integrator,
BiConsumer<@Nullable Void, Downstream<? super R>> finisher) {
return of(
defaultInitializer(),
integrator,
Expand All @@ -462,7 +466,7 @@ static <T, R> Gatherer<T, Void, R> of(
* @throws NullPointerException if any argument is {@code null}
* @return the new {@code Gatherer}
*/
static <T, A, R> Gatherer<T, A, R> of(
static <T extends @Nullable Object, A extends @Nullable Object, R extends @Nullable Object> Gatherer<T, A, R> of(
Supplier<A> initializer,
Integrator<A, T, R> integrator,
BinaryOperator<A> combiner,
Expand All @@ -482,7 +486,7 @@ static <T, A, R> Gatherer<T, A, R> of(
* @since 24
*/
@FunctionalInterface
interface Downstream<T> {
interface Downstream<T extends @Nullable Object> {

/**
* Pushes, if possible, the provided element downstream -- to the next
Expand Down Expand Up @@ -524,7 +528,7 @@ interface Downstream<T> {
* @since 24
*/
@FunctionalInterface
interface Integrator<A, T, R> {
interface Integrator<A extends @Nullable Object, T extends @Nullable Object, R extends @Nullable Object> {
/**
* Performs an action given: the current state, the next element, and
* a downstream object; potentially inspecting and/or updating
Expand All @@ -550,7 +554,7 @@ interface Integrator<A, T, R> {
* @param <R> the type of results this integrator can produce
*/
@ForceInline
static <A, T, R> Integrator<A, T, R> of(Integrator<A, T, R> integrator) {
static <A extends @Nullable Object, T extends @Nullable Object, R extends @Nullable Object> Integrator<A, T, R> of(Integrator<A, T, R> integrator) {
return integrator;
}

Expand All @@ -565,7 +569,7 @@ static <A, T, R> Integrator<A, T, R> of(Integrator<A, T, R> integrator) {
* @param <R> the type of results this integrator can produce
*/
@ForceInline
static <A, T, R> Greedy<A, T, R> ofGreedy(Greedy<A, T, R> greedy) {
static <A extends @Nullable Object, T extends @Nullable Object, R extends @Nullable Object> Greedy<A, T, R> ofGreedy(Greedy<A, T, R> greedy) {
return greedy;
}

Expand All @@ -583,6 +587,6 @@ static <A, T, R> Greedy<A, T, R> ofGreedy(Greedy<A, T, R> greedy) {
* @since 24
*/
@FunctionalInterface
interface Greedy<A, T, R> extends Integrator<A, T, R> { }
interface Greedy<A extends @Nullable Object, T extends @Nullable Object, R extends @Nullable Object> extends Integrator<A, T, R> { }
}
}
14 changes: 9 additions & 5 deletions src/java.base/share/classes/java/util/stream/Gatherers.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
*/
package java.util.stream;

import org.jspecify.annotations.NullMarked;
import org.jspecify.annotations.Nullable;

import jdk.internal.access.SharedSecrets;
import jdk.internal.vm.annotation.ForceInline;

Expand All @@ -49,6 +52,7 @@
*
* @since 24
*/
@NullMarked
public final class Gatherers {
private Gatherers() { } // This class is not intended to be instantiated

Expand Down Expand Up @@ -83,7 +87,7 @@ private Gatherers() { } // This class is not intended to be instantiated
* @return a new gatherer which groups elements into fixed-size windows
* @throws IllegalArgumentException when {@code windowSize} is less than 1
*/
public static <TR> Gatherer<TR, ?, List<TR>> windowFixed(int windowSize) {
public static <TR extends @Nullable Object> Gatherer<TR, ?, List<TR>> windowFixed(int windowSize) {
if (windowSize < 1)
throw new IllegalArgumentException("'windowSize' must be greater than zero");

Expand Down Expand Up @@ -172,7 +176,7 @@ void finish(Downstream<? super List<TR>> downstream) {
* @return a new gatherer which groups elements into sliding windows
* @throws IllegalArgumentException when windowSize is less than 1
*/
public static <TR> Gatherer<TR, ?, List<TR>> windowSliding(int windowSize) {
public static <TR extends @Nullable Object> Gatherer<TR, ?, List<TR>> windowSliding(int windowSize) {
if (windowSize < 1)
throw new IllegalArgumentException("'windowSize' must be greater than zero");

Expand Down Expand Up @@ -259,7 +263,7 @@ void finish(Downstream<? super List<TR>> downstream) {
* @return a new Gatherer
* @throws NullPointerException if any of the parameters are {@code null}
*/
public static <T, R> Gatherer<T, ?, R> fold(
public static <T extends @Nullable Object, R extends @Nullable Object> Gatherer<T, ?, R> fold(
Supplier<R> initial,
BiFunction<? super R, ? super T, ? extends R> folder) {
Objects.requireNonNull(initial, "'initial' must not be null");
Expand Down Expand Up @@ -306,7 +310,7 @@ class State {
* @return a new Gatherer which performs a prefix scan
* @throws NullPointerException if any of the parameters are {@code null}
*/
public static <T, R> Gatherer<T, ?, R> scan(
public static <T extends @Nullable Object, R extends @Nullable Object> Gatherer<T, ?, R> scan(
Supplier<R> initial,
BiFunction<? super R, ? super T, ? extends R> scanner) {
Objects.requireNonNull(initial, "'initial' must not be null");
Expand Down Expand Up @@ -346,7 +350,7 @@ boolean integrate(T element, Downstream<? super R> downstream) {
* @throws IllegalArgumentException if {@code maxConcurrency} is less than 1
* @throws NullPointerException if {@code mapper} is {@code null}
*/
public static <T, R> Gatherer<T,?,R> mapConcurrent(
public static <T extends @Nullable Object, R extends @Nullable Object> Gatherer<T,?,R> mapConcurrent(
final int maxConcurrency,
final Function<? super T, ? extends R> mapper) {
if (maxConcurrency < 1)
Expand Down
2 changes: 1 addition & 1 deletion src/java.base/share/classes/java/util/stream/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -1103,7 +1103,7 @@ default Stream<T> dropWhile(Predicate<? super T> predicate) {
* @return the new stream
* @since 24
*/
default <R> Stream<R> gather(Gatherer<? super T, ?, R> gatherer) {
default <R extends @Nullable Object> Stream<R> gather(Gatherer<? super T, ?, R> gatherer) {
return StreamSupport.stream(spliterator(), isParallel())
.gather(gatherer)
.onClose(this::close);
Expand Down
Loading