FluxFlatMapUsage

ERROR

LikelyError

View source code on GitHub

Summary

Flux#flatMap and Flux#flatMapSequential have subtle semantics; please use Flux#concatMap or explicitly specify the desired amount of concurrency

Suppression

Suppress false positives by adding the suppression annotation @SuppressWarnings("FluxFlatMapUsage") to the enclosing element.

Disable this pattern completely by adding -Xep:FluxFlatMapUsage:OFF as compiler argument. Learn more.

Samples

Replacement

Shows the difference in example code before and after the bug pattern is applied.

 import reactor.core.publisher.Flux;
 
 class A {
   private static final int MAX_CONCURRENCY = 8;
 
   void m() {
-    Flux.just(1).flatMap(Flux::just);
-    Flux.just(1).flatMapSequential(Flux::just);
-    Flux.just(1, 2).groupBy(i -> i).flatMap(Flux::just);
-    Flux.just(1, 2).groupBy(i -> i).flatMapSequential(Flux::just);
+    Flux.just(1).concatMap(Flux::just);
+    Flux.just(1).concatMap(Flux::just);
+    Flux.just(1, 2).groupBy(i -> i).flatMap(Flux::just, MAX_CONCURRENCY);
+    Flux.just(1, 2).groupBy(i -> i).flatMapSequential(Flux::just, MAX_CONCURRENCY);
   }
 }
 
 import reactor.core.publisher.Flux;
 
 class A {
   private static final int MAX_CONCURRENCY = 8;
 
   void m() {
-    Flux.just(1).flatMap(Flux::just);
-    Flux.just(1).flatMapSequential(Flux::just);
-    Flux.just(1, 2).groupBy(i -> i).flatMap(Flux::just);
-    Flux.just(1, 2).groupBy(i -> i).flatMapSequential(Flux::just);
+    Flux.just(1).flatMap(Flux::just, MAX_CONCURRENCY);
+    Flux.just(1).flatMapSequential(Flux::just, MAX_CONCURRENCY);
+    Flux.just(1, 2).groupBy(i -> i).concatMap(Flux::just);
+    Flux.just(1, 2).groupBy(i -> i).concatMap(Flux::just);
   }
 }
 

Identification

Shows code lines which will (not) be flagged by this bug pattern.
A //BUG: Diagnostic contains: comment is placed above any violating line.

import java.util.function.BiFunction;
import java.util.function.Function;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class A {
  void m() {
    // BUG: Diagnostic contains:
    Flux.just(1).flatMap(Flux::just);
    // BUG: Diagnostic contains:
    Flux.just(1).<String>flatMap(i -> Flux.just(String.valueOf(i)));
    // BUG: Diagnostic contains:
    Flux.just(1).flatMapSequential(Flux::just);
    // BUG: Diagnostic contains:
    Flux.just(1).<String>flatMapSequential(i -> Flux.just(String.valueOf(i)));
    // BUG: Diagnostic contains:
    Flux.just(1, 2).groupBy(i -> i).flatMap(Flux::just);
    // BUG: Diagnostic contains:
    Flux.just(1, 2).groupBy(i -> i).<String>flatMap(i -> Flux.just(String.valueOf(i)));
    // BUG: Diagnostic contains:
    Flux.just(1, 2).groupBy(i -> i).flatMapSequential(Flux::just);
    // BUG: Diagnostic contains:
    Flux.just(1, 2).groupBy(i -> i).<String>flatMapSequential(i -> Flux.just(String.valueOf(i)));

    Mono.just(1).flatMap(Mono::just);
    Flux.just(1).concatMap(Flux::just);

    Flux.just(1).flatMap(Flux::just, 1);
    Flux.just(1).flatMap(Flux::just, 1, 1);
    Flux.just(1).flatMap(Flux::just, throwable -> Flux.empty(), Flux::empty);

    Flux.just(1).flatMapSequential(Flux::just, 1);
    Flux.just(1).flatMapSequential(Flux::just, 1, 1);

    // BUG: Diagnostic contains:
    this.<String, Flux<String>>sink(Flux::flatMap);
    // BUG: Diagnostic contains:
    this.<Integer, Flux<Integer>>sink(Flux::<Integer>flatMap);

    // BUG: Diagnostic contains:
    this.<String, Flux<String>>sink(Flux::flatMapSequential);
    // BUG: Diagnostic contains:
    this.<Integer, Flux<Integer>>sink(Flux::<Integer>flatMapSequential);

    this.<String, Mono<String>>sink(Mono::flatMap);
  }

  private <T, P> void sink(BiFunction<P, Function<T, P>, P> fun) {}
}

Copyright © 2017-2024 Picnic Technologies BV