In the previous chapter, we learnt about some basic fundamentals operator for Mono and Flux. As part of this chapter, we’ll dig deeper and understand some more advanced but essential operators.

Advanced Operators of Mono

flatMap

Mono.flatMap() is used to transforms the emitted item into another Mono, allowing for asynchronous processing and composition of Mono streams.

Syntax:

public final <R> Mono<R> flatMap(Function<? super T, ? extends Mono<? extends R>> transformer)

Example:

Mono.just("Hello")
.flatMap(str -> Mono.just("%s World!".formatted(str)))
.subscribe(System.out::println);
// Output: Hello World!

flatMapMany

Mono.flatMapMany() is used to transforms a Mono into a Flux by applying a function to each emitted item.

Syntax:

public final <R> Flux<R> flatMapMany(Function<? super T, ? extends Publisher<? extends R>> mapper)

Example:

Mono.just("Hello")
.flatMapMany(str -> Flux.just(str.split("")))
.subscribe(System.out::println);
// Output: H
// e
// l
// l
// o

concatWith

Mono.concatWith() is used to concatenate the current Mono with another Mono, ensuring that the second Mono is subscribed only after the completion of the first.

Syntax:

public final Flux<T> concatWith(Publisher<? extends T> other)

Example:

Mono.just("Hello")
.concatWith(Mono.just("World!"))
.doOnNext(System.out::println)
.subscribe();
// Output: Hello
// World

mergeWith

Mono.mergeWith() is used to merge emission of current Mono with the given publisher. The emitted element from the current Mono maybe interleaved with the elements from the publisher.

Syntax:

public final Flux<T> mergeWith(Publisher<? extends T> other)

Example:

Mono.just("Hello")
.mergeWith(Mono.just("World!"))
.subscribe(System.out::println);
// Output: Hello
// World

Both mergeWith and concatWith operators in Project Reactor’s Mono class combine data from two Mono sources. They take another Mono and return a new Mono with the combined data. The key difference is in emission order.

  • mergeWith (eager): Subscribes to both sources at once. It emits elements from whichever source is ready first, leading to an unpredictable order in the final output.
  • concatWith (sequential): Subscribes to the second source only after the first finishes. This ensures elements from the first Mono come before the second, preserving the original order.

zip

Mono.zip() is used to combine data from multiple Mono sources into a single emission. It achieves this by applying a provided zipper function. In simpler words, Mono.zip is like having a pre-defined recipe for combining Monos in a specific order.

Syntax:

public static <T1, T2, O> Mono<O> zip(Mono<? extends T1> p1, Mono<?
extends T2> p2, BiFunction<? super T1, ? super T2, ? extends O> combinator)

public static <T1, T2, T3> Mono<Tuple3<T1, T2, T3>> zip(Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3)

Example:

Mono.zip(Mono.just("Hello"),
Mono.just("World!"), "%s %s"::formatted)
.subscribe(System.out::println);
// Output: Hello World!

Mono.zip(Mono.just("Hello"),
Mono.just("World!"),
Mono.just("Hi"))
.subscribe(System.out::println);
// Output: [Hello,World!,Hi]

zipWith

Mono.zipWith() acts similar to Mono.zip() but with more flexibility for combining Mono sources. It waits for only the first provided Mono to emit a value, then uses the zipper function to determine how to combine it with another source (which can be a static value or a new Mono). This allows for conditional logic and dynamic behaviour. In simpler terms, Mono.zipWith allows to adapt the recipe based on the first Mono and potentially add new Mono dynamically.

Syntax:

public final <T2> Mono<Tuple2<T, T2>> zipWith(Mono<? extends T2> other)

public final <T2, O> Mono<O> zipWith(Mono<? extends T2> other,
BiFunction<? super T, ? super T2, ? extends O> combinator)

Example:

Mono.just("Hello")
.zipWith(Mono.just("World!"))
.subscribe(System.out::println);
// Output: [Hello,World!]

Mono.just("Hello")
.zipWith(Mono.just("World!"), "%s %s"::formatted)
.subscribe(System.out::println);
// Output: Hello World!

defaultIfEmpty

Emits a default value if the Mono completes without emitting any items.

Syntax:

public final Mono<T> defaultIfEmpty(T defaultV)

Example:

Mono.empty()
.defaultIfEmpty("Default value")
.subscribe(System.out::println);
// Output: Default value

switchIfEmpty

Emits a default value from the given publisher if the Mono completes without emitting any items.

Syntax:

public final Mono<T> switchIfEmpty(Mono<? extends T> alternate)

Example:

Mono.empty()
.switchIfEmpty(Mono.just("Default value"))
.subscribe(System.out::println);
// Output: Default value

Both switchIfEmpty and defaultIfEmpty is used to provide fallback value incase original Mono is completes without emitting any items. The key difference between both the method is, for defaultIfEmpty the value is already available, on the other hand switchIfEmpty help to switch to a completely different Mono.

repeat

Repeats the emission of a Mono a specified number of times or indefinitely.

Syntax:

public final Flux<T> repeat()

public final Flux<T> repeat(long numRepeat)

public final Flux<T> repeat(BooleanSupplier predicate)

Example:

Mono.just("Hello")
.repeat()
.doOnNext(System.out::println)
.subscribe();
// Output: Hello
// Hello
// Hello
// Hello
// ..... indefinitely

Mono.just("Hello")
.repeat(3)
.doOnNext(System.out::println)
.subscribe();
// Output: Hello
// Hello
// Hello
// Hello

Mono.just("Hello")
.repeat(() -> Math.random() < 0.5)
.doOnNext(System.out::println)
.subscribe();
// Output: Hello
// Hello
// Hello
// Hello
// ..... until Math.random() >= 0.5

retry

Automatically retries/re-subscribes a Mono a specified number of times or indefinitely if Mono emit an error signal. This is really helpful to handle error scenarios.

Syntax:

public final Mono<T> retry()

public final Mono<T> retry(long numRetries)

Example:

Mono.just("Hello")
.doOnNext(System.out::println)
.map(str -> {
if (str.equals("Hello")) {
throw new RuntimeException("Error!");
}
return str;
})
.retry()
.subscribe(System.out::println, System.err::println);
// Output: Hello
// Hello
// Hello
// .... indefinitely

Mono.just("Hello")
.doOnNext(System.out::println)
.map(str -> {
if (str.equals("Hello")) {
throw new RuntimeException("Error!");
}
return str;
})
.retry(2)
.subscribe(System.out::println, System.err::println);
// Output: Hello
// Hello
// Hello
// Error: java.lang.RuntimeException: Error!

Note: we will discuss more real life use cases in later chapters.

retryWhen

retryWhen behaves similar to Mono.retry but it comes with better flexibility. This is really helpful to handle error scenarios conditionally.

Syntax:

public final Mono<T> retryWhen(Retry retrySpec)

Example:

Mono.just("Hello")
.doOnNext(System.out::println)
.map(str -> {
if (str.equals("Hello")) {
throw new RuntimeException("Error!");
}
return str;
})
.retryWhen(Retry.indefinitely()
.filter(throwable -> throwable instanceof RuntimeException))
.subscribe(System.out::println, System.err::println);
// Output: Hello
// Hello
// Hello
// .... indefinitely

Note: we will discuss more real life use cases in later chapters.

defer

Mono.defer() is used to create a Mono source dynamically, delaying the creation of the actual Mono until the moment of subscription. As defer use lazy initialization, it helps to improve the performance. Additionally, the lazily initialized Mono can be dynamically constructed based on context.

Syntax:

public static <T> Mono<T> defer(Supplier<? extends Mono<? extends T>> supplier)

Example:

Mono.defer(() -> Mono.just("Hello"))
.subscribe(System.out::println);
// Output: Hello

Mono.defer(() -> Math.random() > 0.5 ?
Mono.just("Hello") :
Mono.just("World"))
.subscribe(System.out::println);
// Output: either Hello or World based on the value of Math.random()

contextWrite

Flux.contextWrite() is used to manipulate the context associated with a Flux source similar to Mono.contextWrite(). This context can be accessed by downstream operators that might need additional information for processing.

Syntax:

public final Flux<T> contextWrite(ContextView contextToAppend)

Example:

Flux.just("John", "Kurt")
.contextWrite(Context.of("traceId", UUID.randomUUID()))
.subscribe(System.out::println);
// Output: John
// Kurt

Flux.just("John", "Kurt")
.flatMap(username ->
Flux.deferContextual(contextView ->
Mono.just("%s -> %s".formatted(username, contextView.get("userId"))))
.contextWrite(Context.of("userId", "user-id-%s".formatted(username.toLowerCase()))))
.subscribe(System.out::println);
// Output: John -> user-id-john
// Kurt -> user-id-kurt
To implement MDC in Spring Reactive we need to make use of contextWrite

cache

Mono.cache() is used to cache (remember) last emitted signal (success value or error) and replays it to future subscribers. This is particularly useful for optimizing performance by avoiding redundant execution of expensive operations represented by the Mono.

By default, cache remembers the last emitted signal indefinitely but in more advanced uses we can customise it as per the requirements (we will discuss in details in later chapters).

Syntax:

public final Mono<T> cache()

public final Mono<T> cache(Duration ttl)

Example:

final Mono<String> cachedMono = Mono.fromCallable(() -> {
System.out.println("Simulate a slow operation");
Thread.sleep(500);
return "World";
}).cache();

cachedMono.subscribe(data -> System.out.printf("First subscription: Hello %s%n", data));
cachedMono.subscribe(data -> System.out.printf("Second subscription: Hello %s%n", data));
// Output: Simulate a slow operation
// First subscription: Hello World
// Second subscription: Hello World


final Mono<String> cachedMono = Mono.fromCallable(() -> {
System.out.println("Simulate a slow operation");
Thread.sleep(500);
return "World";
}).cache(Duration.ofMillis(200));

cachedMono.subscribe(data -> System.out.printf("First subscription: Hello %s%n", data));
Thread.sleep(300);
cachedMono.subscribe(data -> System.out.printf("Second subscription: Hello %s%n", data));
// Output: Simulate a slow operation
// First subscription: Hello World
// Simulate a slow operation
// Second subscription: Hello World

Advanced Operators of Flux

flatMap

Flux.flatMap() is similar to Mono.map() used for transforming elements within a Flux stream and creating a new Flux with potentially more elements.

Syntax:

public final <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)

Example:

Flux.just("Hello", "World")
.flatMap(str -> Flux.fromArray(str.split("")))
.doOnNext(System.out::println)
.subscribe();
// Output: H
// e
// l
// l
// o
// w
// o
// r
// l
// d

concatWith

Flux.concatWith() concatenates two Flux stream, emitting elements from the first Flux until it completes, then emitting elements from the second Flux.

Syntax:

public final Flux<T> concatWith(Publisher<? extends T> other)

Example:

Flux.just("Hello", "World")
.concatWith(Flux.just("Hi", "Human"))
.doOnNext(System.out::println)
.subscribe();
// Output: Hello
// World
// Hi
// Human

mergeWith

Flux.mergeWith() concatenates two Flux stream similar to Flux.concatWith(), but the major difference is mergeWith eagerly subscribe both the Flux stream where concatWith sequentially subscribe the Fluxs.

Syntax:

public final Flux<T> mergeWith(Publisher<? extends T> other)

Example:

Flux.just("Hello", "World").delayElements(Duration.ofMillis(100))
.mergeWith(Flux.just("Hi", "Human"))
.doOnNext(System.out::println)
.subscribe();
// Output: Hi
// Human
// Hello
// World

zipWith

Flux.zipWith() pairs up the latest elements of Flux sequences into a single Tuple whenever each Flux emits an element. Additionally, we can provide combinator to customise the elements pairs up process.

Syntax:

public final <T2> Flux<Tuple2<T, T2>> zipWith(Publisher<? extends T2> source2)

public final <T2, V> Flux<V> zipWith(Publisher<? extends T2> source2,
final BiFunction<? super T, ? super T2, ? extends V> combinator)

Example:

Flux.just("Hello", "World")
.zipWith(Flux.just("Hi", "Human"))
.doOnNext(System.out::println)
.subscribe();
// Output: [Hello,Hi]
// [World,Human]

Flux.just("Hello", "World")
.zipWith(Flux.just("Hi", "Human"), "%s, %s"::formatted)
.doOnNext(System.out::println)
.subscribe();
// Output: Hello, Hi
// World, Human

Flux.just("Hello")
.zipWith(Flux.just("Hi", "Human"))
.doOnNext(System.out::println)
.subscribe();
// Output: [Hello,Hi]
zipWith stops processing when either of the streams emits an error or completes. This ensures the resulting stream's termination state reflects the first stream to reach that point.

defaultIfEmpty

Emits a default value if the Flux completes without emitting any items, similar to Mono.defaultIfEmpty().

Syntax:

public final Flux<T> defaultIfEmpty(T defaultV)

Example:

Flux.empty()
.defaultIfEmpty("Default value")
.subscribe(System.out::println);
// Output: Default value

switchIfEmpty

Emits a default value from the given publisher if the Flux completes without emitting any items.

Syntax:

public final Flux<T> switchIfEmpty(Publisher<? extends T> alternate)

Example:

Flux.empty()
.switchIfEmpty(Flux.just("Default", "value"))
.subscribe(System.out::println);
// Output: Default
// value

Flux.empty()
.switchIfEmpty(Mono.just("Default value"))
.subscribe(System.out::println);
// Output: Default value

Both switchIfEmpty and defaultIfEmpty is used to provide fallback value incase original Flux is completes without emitting any items. The key difference between both the method is, for defaultIfEmpty the value is already available, on the other hand switchIfEmpty help to switch to a completely different Flux.

repeat

Repeats the emission of a Flux for a specified number of times or indefinitely.

Syntax:

public final Flux<T> repeat()

public final Flux<T> repeat(long numRepeat)

public final Flux<T> repeat(BooleanSupplier predicate)

Example:

Flux.just("Hello", "World")
.repeat()
.doOnNext(System.out::println)
.subscribe();
// Output: Hello
// World
// Hello
// World
// Hello
// World
// ..... indefinitely

Flux.just("Hello", "World")
.repeat(2)
.doOnNext(System.out::println)
.subscribe();
// Output: Hello
// World
// Hello
// World
// Hello
// World

Flux.just("Hello", "World")
.repeat(() -> Math.random() < 0.5)
.doOnNext(System.out::println)
.subscribe();
// Output: Hello
// World
// Hello
// World
// Hello
// World
// ..... until Math.random() >= 0.5

retry

Automatically retries/re-subscribes a Flux a specified number of times or indefinitely if Mono emit an error signal. This is really helpful to handle error scenarios.

Syntax:

public final Flux<T> retry()

public final Flux<T> retry(long numRetries)

Example:

Flux.just("Hello", "World")
.doOnNext(System.out::println)
.map(str -> {
throw new RuntimeException("%s Error!".formatted(str));
})
.retry()
.subscribe(str -> System.out.printf("Subscribe consumer %s%n", str),
System.err::println);
// Output: Hello
// Hello
// Hello
// .... indefinitely

Flux.just("Hello", "World")
.doOnNext(System.out::println)
.map(str -> {
if (str.equals("World")) {
throw new RuntimeException("%s Error!".formatted(str));
}
return str;
})
.retry(2)
.subscribe(str -> System.out.printf("Subscribe consumer %s%n", str),
System.err::println);
// Output: Hello
// Subscribe consumer Hello
// World
// Hello
// Subscribe consumer Hello
// World
// Hello
// Subscribe consumer Hello
// World
// java.lang.RuntimeException: World Error!

Note: we will discuss more real life use cases in later chapters.

retryWhen

retryWhen behaves similar to Flux.retry but it comes with better flexibility. This is really helpful to handle error scenarios conditionally.

Syntax:

public final Flux<T> retryWhen(Retry retrySpec)

Example:

Flux.just("Hello", "World")
.doOnNext(System.out::println)
.map(str -> {
if (str.equals("World")) {
throw new IllegalArgumentException("%s Error!".formatted(str));
}
throw new IllegalCallerException("%s Error!".formatted(str));
})
.retryWhen(Retry.indefinitely()
.filter(throwable -> throwable instanceof IllegalCallerException))
.subscribe(str -> System.out.printf("Subscribe consumer %s%n", str),
System.err::println);
// Output: Hello
// Hello
// Hello
// .... indefinitely

Flux.just("Hello", "World")
.doOnNext(System.out::println)
.map(str -> {
if (str.equals("World")) {
throw new IllegalArgumentException("%s Error!".formatted(str));
}
throw new IllegalCallerException("%s Error!".formatted(str));
})
.retryWhen(Retry.indefinitely()
.filter(throwable -> throwable instanceof IllegalArgumentException))
.subscribe(str -> System.out.printf("Subscribe consumer %s%n", str),
System.err::println);
// Output: Hello
// java.lang.IllegalCallerException: Hello Error!

Note: we will discuss more real life use cases in later chapters.

defer

Flux.defer() is used to create a Flux source dynamically, delaying the creation of the actual Flux until the moment of subscription, similar to Mono.defer().

Syntax:

public static <T> Flux<T> defer(Supplier<? extends Publisher<T>> supplier)

Example:

Flux.defer(() -> Flux.just("Hello", "World"))
.subscribe(System.out::println);
// Output: Hello
// World

Flux.defer(() -> Math.random() > 0.5 ?
Flux.just("Hello", "World") :
Flux.just("Hi", "Human"))
.subscribe(System.out::println);
// Output: either Hello World or Hi Human based on the value of Math.random()

cache

Flux.cache() is used to cache (remember) last emitted sequences (success value or error) and replays it to future subscribers. This is particularly useful for optimizing performance by avoiding redundant execution of expensive operations represented by the Flux.

By default, cache remembers the last emitted signal indefinitely but in more advanced uses we can customise it as per the requirements (we will discuss in details in later chapters).

Syntax:

public final Flux<T> cache()

public final Flux<T> cache(Duration ttl)

Example:

final Flux<String> cachedFlux = Flux.just("John", "Kurt")
.doOnNext(username -> System.out.printf("Original element %s%n", username))
.delayElements(Duration.ofMillis(500))
.cache();

cachedFlux.subscribe(data -> System.out.printf("First subscription: Hello %s%n", data));
cachedFlux.subscribe(data -> System.out.printf("Second subscription: Hello %s%n", data));
// Output: Original element John
// First subscription: Hello John
// Second subscription: Hello John
// Original element Kurt
// First subscription: Hello Kurt
// Second subscription: Hello Kurt


final Flux<String> cachedFlux = Flux.just("John", "Kurt")
.doOnNext(username -> System.out.printf("Original element %s%n", username))
.delayElements(Duration.ofMillis(100))
.cache(Duration.ofMillis(200));

cachedFlux.subscribe(data -> System.out.printf("First subscription: Hello %s%n", data));
Thread.sleep(500);
cachedFlux.subscribe(data -> System.out.printf("Second subscription: Hello %s%n", data));
// Output: Original element John
// First subscription: Hello John
// Original element Kurt
// First subscription: Hello Kurt
// Original element John
// Second subscription: Hello John
// Original element Kurt
// Second subscription: Hello Kurt

Conclusion

In this article, we’ve explored many powerful operators that can elevate reactive programming skills to new heights. From merging, combining, and transforming data streams to handling errors, retries, and caching, these operators provide the building blocks for creating robust, resilient, and high-performance applications.

By mastering these operators, you can easily craft reactive pipelines that efficiently handle complex asynchronous scenarios, such as data processing, network communications, and event-driven architectures. Whether you’re building microservices, web applications, or backend systems, the versatility of Mono and Flux operators empowers you to design elegant solutions that scale gracefully and deliver exceptional user experiences.

This article is a segment of the Cooking up Reactivity: The Spring Way series. To explore the entire series, click here.

Originally posted on medium.com.