Now that we understood the building blocks as part of previous chapter, let’s try understand the basic operators and how to use them.

Basic operators of Mono

map

Mono.map() is mainly used for transformation of data, basically you have some data and when you want to convert the same data to another by applying certain synchronous transformation logic, we can make use of map there.

Syntax:

public final <R> Mono<R> map(Function<? super T,? extends R> mapper)

Example:

Mono.just(2)
// We're adding the data (2) with 5,
// so once map executes it'll transform 2 to 7
.map(number -> number + 5)
.subscribe();

filter

Mono.filter() is used to filter out elements if it does not compile with certain condition.

Syntax:

public final Mono<T> filter(Predicate<? super T> tester)

Example:

Mono.just(2)
.map(number -> number + 5)
// Return value if the transformed data is an even number
// otherwise return empty mono. In this case, it'll return
// an empty mono as 7 is not fully divisible by 2
.filter(number -> number % 2 == 0)
.subscribe();

doOnNext

Mono.doOnNext() is used to perform side-effect to the emitted element.

Syntax:

public final Mono<T> doOnNext(Consumer<? super T> onNext)

Example:

Mono.just(2)
.map(number -> number * 5)
// It'll print "The value is 10" on the console
.doOnNext(number -> log.info("The value is {}", number))
.subscribe();
Note: doOnNext() get’s invoked only if onNext signal emitted by the Mono.

doOnError

Mono.doOnError() is used to perform side-effects if mono emits an error signal.

Additionally, using doOnError method, we can also perform specific side effects for specific kind of error either by directly mentioning the error type or by providing a predicate.

Syntax:

public final Mono<T> doOnError(Consumer<? super Throwable> onError)

public final <E extends Throwable> Mono<T> doOnError(Class<E> exceptionType,
Consumer<? super E> onError)

public final Mono<T> doOnError(Predicate<? super Throwable> predicate,
Consumer<? super Throwable> onError)

Example:

// Always log error
Mono.just(2)
.map(number -> number / 0)
// Given that `number` can not be divisible by 0, it'll throw an error
// and will log "Unable to perform task" with error details
// This error will always be logged irrespective of error type
.doOnError(throwable -> log.error("Unable to perform task", throwable))
.doOnNext(number -> log.info("The value is {}", number))
.subscribe();

// log only if ArithmeticException occurs
Mono.just(2)
.map(number -> number / 0)
// Given that `number` can not be divisible by 0, it'll throw an error
// and will log "Unable to perform arithmetic operation" with error details
// as .map(number -> number / 0) will emit ArithmeticException.class. If any
// other type of error occurs, this doOnError will be ignored
.doOnError(ArithmeticException.class, throwable -> log.error("Unable to perform arithmetic operation", throwable))
.doOnNext(number -> log.info("The value is {}", number))
.subscribe();

// log only if exception is not an ArithmeticException
Mono.just(2)
.map(number -> number / 0)
// Given that `number` can not be divisible by 0, it'll throw an error
// ArithmeticException, due to that this doOnError will be ignored.
// In-case of any other type of error, it will log "Unable to perform task" with error details
.doOnError(throwable -> !(throwable instanceof ArithmeticException), throwable -> log.error("Unable to perform task", throwable))
.doOnNext(number -> log.info("The value is {}", number))
.subscribe();
Note: doOnError() get’s invoked only if onError signal emitted by the Mono. You can also add multiple doOnError method as per your requirement

doOnSuccess

Mono.doOnSuccess() is used to perform side-effects if mono gets completed successfully.

Syntax:

public final Mono<T> doOnSuccess(Consumer<? super T> onSuccess)

Example:

Mono.just(2)
.map(number -> number * 5)
// It'll print "The value is 10" on the console
.doOnSuccess(number -> log.info("The value is {}", number))
.subscribe();
Note: doOnSuccess() method can consume null element incase of empty Mono.

subscribe

As we understood previously, in reactive programming instructions does not gets executed until it’s required, subscribe is one of the way to instruct Mono publisher to start the execution asynchronously.

Syntax:

public final Disposable subscribe()

public final Disposable subscribe(Consumer<? super T> consumer)

public final Disposable subscribe(@Nullable
Consumer<? super T> consumer,
@Nullable
Consumer<? super Throwable> errorConsumer,
@Nullable
Runnable completeConsumer)

Example:

Mono.just(2)
.map(number -> number * 5)
// It'll print "The value is 10" on the console
.subscribe(number -> log.info("The value is {}", number));

Mono.just(2)
.map(number -> number * 5)
// It'll log as follows
// The value is 10
// Completed
.subscribe(
number -> log.info("The value is {}", number),
throwable -> log.error("Unable to perform task", throwable),
() -> log.info("Completed")
);

block

Mono.block() is similar to Mono.subscribe() method but the main difference is block explicitly holds the main thread indefinitely until the publisher completes. We can also decide the timeout by using Mono.block(Duration timeout) method, if publisher is not complete within the provided time, timeout expires and raise a exception with TimeoutException as cause.

In case of empty Mono, it’ll return null value. If Mono emits an error signal then the original error will be wrapped in RuntimeException.

Syntax:

@Nullable
public T block()

@Nullable
public T block(Duration timeout)

Example:

Mono.just(2)
// We're adding the data (2) with 5,
// so once map executes it'll transform 2 to 7
.map(number -> number + 5)
.block();
It’s not recommend to use block in reactive programming.

Basic operators of Flux

map

Flux.map() is used for element transformation using synchronous mapper similar to Mono.map(), but Flux.map() transform all the elements of the Flux

Syntax:

public final <V> Flux<V> map(Function<? super T,? extends V> mapper)

Example:

Flux.just(2, 4)
// We're adding 5 with each element,
// so once map executes it'll return 7 and 9
.map(number -> number + 5)
.subscribe();

filter

Flux.filter() is used to filter out elements if it does not compile with certain condition similar to Mono.filter().

Syntax:

public final Flux<T> filter(Predicate<? super T> p)

Example:

Flux.just(2, 7, 3, 6)
.map(number -> number + 5)
// If the transformed element is an even number
// emit the element otherwise ignore.
// In this case, it'll return 12, 8
.filter(number -> number % 2 == 0)
.subscribe();

doOnNext

Mono.doOnNext() is used to perform side-effect to the each emitted elements.

Syntax:

public final Flux<T> doOnNext(Consumer<? super T> onNext)

Example:

Flux.just(2, 7, 3, 6)
.map(number -> number + 5)
// It'll print the following lines on console
// "The value is 7"
// "The value is 12"
// "The value is 8"
// "The value is 11"
.doOnNext(number -> log.info("The value is {}", number))
.subscribe();
Note: doOnNext() get’s invoked only if onNext signal emitted by the Flux. Incase of empty Flux, doOnNext will be skipped.

doOnError

Flux.doOnError() is used to perform side-effects for error signal similar to Mono.doOnError(). The main difference is Flux can emit more than one onError signal based on error handling (we’ll deep dive in later chapters). By default, on first onError signal, Flux stops processing for other remaining elements.

Syntax:

public final Flux<T> doOnError(Consumer<? super Throwable> onError)

public final <E extends Throwable> Flux<T> doOnError(Class<E> exceptionType,
Consumer<? super E> onError)

public final Flux<T> doOnError(Predicate<? super Throwable> predicate,
Consumer<? super Throwable> onError)

Example:

// Always log error
Flux.just(10, 0, 3)
.map(number -> 20 / number)
// Here it'll first process `20/10` and log "The value is 2"
// post that it'll try to process for `20`. Given that `20` can not be divisible by `0`,
// it'll throw an error and will log "Unable to perform task" with error details
// This error will always be logged irrespective of error type
.doOnError(throwable -> log.error("Unable to perform task", throwable))
.doOnNext(number -> log.info("The value is {}", number))
.subscribe();

// log only if ArithmeticException occurs
Flux.just(10, 0, 3)
.map(number -> 20 / number)
// Here it'll first process `20/10` and log "The value is 2"
// post that it'll try to process for `20`. Given that `20` can not be divisible by `0`,
// it'll throw an error of ArithmeticException.class and
// will log "Unable to perform arithmetic operation" with error details. If any
// other type of error occurs, this doOnError will be ignored
.doOnError(ArithmeticException.class, throwable -> log.error("Unable to perform arithmetic operation", throwable))
.doOnError(throwable -> log.error("Unable to perform task", throwable))
.doOnNext(number -> log.info("The value is {}", number))
.subscribe();

// log only if exception is not an ArithmeticException
Flux.just(10, 0, 3)
.map(number -> 20 / number)
// Here it'll first process `20/10` and log "The value is 2"
// post that it'll try to process for `20`. Given that `20` can not be divisible by `0`,
// it'll throw an error of ArithmeticException, due to that this doOnError will be ignored.
// In-case of any other type of error, it will log "Unable to perform task" with error details
.doOnError(throwable -> !(throwable instanceof ArithmeticException), throwable -> log.error("Unable to perform task", throwable))
.doOnNext(number -> log.info("The value is {}", number))
.subscribe();
Note: doOnError() get’s invoked only if onError signal emitted by the Flux. You can also add multiple doOnError method as per your requirement

doOnComplete

Flux.doOnComplete() is used to perform side-effects if Flux gets completed successfully.

Syntax:

public final Flux<T> doOnComplete(Runnable onComplete)

Example:

Flux.just(10, 0, 3)
.map(number -> 20 * number)
// Here the logs will be as follows
// The value is 200
// The value is 0
// The value is 60
// Process completed
.doOnNext(number -> log.info("The value is {}", number))
.doOnComplete(() -> log.info("Process completed"))
.subscribe();

subscribe

Flux.subscribe() is similar to Mono.subscribe(), used to instruct Flux publisher to start execution asynchronously.

Syntax:

public final Disposable subscribe()

public final Disposable subscribe(Consumer<? super T> consumer)

public final Disposable subscribe(@Nullable
Consumer<? super T> consumer,
@Nullable
Consumer<? super Throwable> errorConsumer,
@Nullable
Runnable completeConsumer)

Example:

Flux.just(2, 4)
.map(number -> number * 5)
// It'll log as follows
// The value is 10
// The value is 20
.subscribe(number -> log.info("The value is {}", number));

Flux.just(2, 4)
.map(number -> number * 5)
// It'll log as follows
// The value is 10
// The value is 20
// Completed
.subscribe(
number -> log.info("The value is {}", number),
throwable -> log.error("Unable to perform task", throwable),
() -> log.info("Completed")
);

Conclusion

In this article, we explored essential operators such as map, filter, and doOnNext, among others. Proficiency in these core concepts establishes a sturdy groundwork for developing scalable and responsive applications in today’s software landscape. While we’ve covered key operators for Mono and Flux publishers, there are many more to discover in subsequent parts of this series.

This article is a segment of the Cooking up Reactivity: The Spring Way series. To explore the entire series, click here.

Originally posted on medium.com.