FluxFlatMapUsage
ERROR
LikelyError
Summary
Flux#flatMap
andFlux#flatMapSequential
have subtle semantics; please useFlux#concatMap
or explicitly specify the desired amount of concurrency
Suppression
Suppress false positives by adding the suppression annotation
@SuppressWarnings("FluxFlatMapUsage")
to the enclosing element.Disable this pattern completely by adding
-Xep:FluxFlatMapUsage:OFF
as compiler argument. Learn more.
Samples
Replacement
Shows the difference in example code before and after the bug pattern is applied.
import reactor.core.publisher.Flux;
class A {
private static final int MAX_CONCURRENCY = 8;
void m() {
- Flux.just(1).flatMap(Flux::just);
- Flux.just(1).flatMapSequential(Flux::just);
- Flux.just(1, 2).groupBy(i -> i).flatMap(Flux::just);
- Flux.just(1, 2).groupBy(i -> i).flatMapSequential(Flux::just);
+ Flux.just(1).concatMap(Flux::just);
+ Flux.just(1).concatMap(Flux::just);
+ Flux.just(1, 2).groupBy(i -> i).flatMap(Flux::just, MAX_CONCURRENCY);
+ Flux.just(1, 2).groupBy(i -> i).flatMapSequential(Flux::just, MAX_CONCURRENCY);
}
}
import reactor.core.publisher.Flux;
class A {
private static final int MAX_CONCURRENCY = 8;
void m() {
- Flux.just(1).flatMap(Flux::just);
- Flux.just(1).flatMapSequential(Flux::just);
- Flux.just(1, 2).groupBy(i -> i).flatMap(Flux::just);
- Flux.just(1, 2).groupBy(i -> i).flatMapSequential(Flux::just);
+ Flux.just(1).flatMap(Flux::just, MAX_CONCURRENCY);
+ Flux.just(1).flatMapSequential(Flux::just, MAX_CONCURRENCY);
+ Flux.just(1, 2).groupBy(i -> i).concatMap(Flux::just);
+ Flux.just(1, 2).groupBy(i -> i).concatMap(Flux::just);
}
}
Identification
Shows code lines which will (not) be flagged by this bug pattern.
A //BUG: Diagnostic contains:
comment is placed above any violating line.
import java.util.function.BiFunction;
import java.util.function.Function;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
class A {
void m() {
// BUG: Diagnostic contains:
Flux.just(1).flatMap(Flux::just);
// BUG: Diagnostic contains:
Flux.just(1).<String>flatMap(i -> Flux.just(String.valueOf(i)));
// BUG: Diagnostic contains:
Flux.just(1).flatMapSequential(Flux::just);
// BUG: Diagnostic contains:
Flux.just(1).<String>flatMapSequential(i -> Flux.just(String.valueOf(i)));
// BUG: Diagnostic contains:
Flux.just(1, 2).groupBy(i -> i).flatMap(Flux::just);
// BUG: Diagnostic contains:
Flux.just(1, 2).groupBy(i -> i).<String>flatMap(i -> Flux.just(String.valueOf(i)));
// BUG: Diagnostic contains:
Flux.just(1, 2).groupBy(i -> i).flatMapSequential(Flux::just);
// BUG: Diagnostic contains:
Flux.just(1, 2).groupBy(i -> i).<String>flatMapSequential(i -> Flux.just(String.valueOf(i)));
Mono.just(1).flatMap(Mono::just);
Flux.just(1).concatMap(Flux::just);
Flux.just(1).flatMap(Flux::just, 1);
Flux.just(1).flatMap(Flux::just, 1, 1);
Flux.just(1).flatMap(Flux::just, throwable -> Flux.empty(), Flux::empty);
Flux.just(1).flatMapSequential(Flux::just, 1);
Flux.just(1).flatMapSequential(Flux::just, 1, 1);
// BUG: Diagnostic contains:
this.<String, Flux<String>>sink(Flux::flatMap);
// BUG: Diagnostic contains:
this.<Integer, Flux<Integer>>sink(Flux::<Integer>flatMap);
// BUG: Diagnostic contains:
this.<String, Flux<String>>sink(Flux::flatMapSequential);
// BUG: Diagnostic contains:
this.<Integer, Flux<Integer>>sink(Flux::<Integer>flatMapSequential);
this.<String, Mono<String>>sink(Mono::flatMap);
}
private <T, P> void sink(BiFunction<P, Function<T, P>, P> fun) {}
}