Reactive Programming in Java

Updated: Sep 20


In this post, we will try to understand the basics of the Reactive programming, what are the core features of it and how to handle the errors in reactive programming. This post will be more on the explanation section and less on coding something. That will come later.


According to Wikipedia, Reactive Programming is defined as:

Reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. This means that it becomes possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with ease via the employed programming language(s).

Reactive programming paradigm is often presented in object oriented languages as an extension of the Observer design pattern. It can also be compared to familiar iterator design pattern, with one difference where Iterator is pull based, while reactive streams are push based.


Reactive streams follows the Publisher-Subscriber pattern, where the publisher notifies the subscribers of newly available values as they come, and this push aspect is the key to being reactive. Also, operations applied to pushed values are expressed declaratively rather than imperatively. The programmer expressed the logic of the computation rather than describing its exact control flow.


In addition to pushing values, the error-handling and completion aspects are also covered in a well defined manner. A Publisher can push new values to its Subscriber (by calling onNext) but can also signal an error (by calling onError) or completion (by calling onComplete). Both errors and completion terminate the sequence.


Why Reactive

There is a detailed explanation about why it is used on the official site.

By writing asynchronous, non-blocking code, you let the execution switch to another active task that uses the same underlying resources and later comes back to the current process when the asynchronous processing has finished.

Reactor libraries, such as Reactor, aim to address these drawbacks of classic asynchronous approaches on the JVM:

  • Composability and readability

  • Data as a flow manipulated with a rich vocabulary of operators.

  • Nothing happens until you subscribe

  • Backpressure or the ability for the customer to signal thte producer that the rate of emission is too high.

  • High level but high values abstraction that is concurrency-agnostics.


Reactor Core Features

You can use the following maven dependency in your project to use reactive features.

<!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core -->
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.4.22</version>
</dependency>

In reactive paradigm, there are two types of response

- Single Object(Mono), or

- List of Objects(Flux)


A Flux object represents a reactive sequence of 0..N items, while a Mono object represents a single-value-or-empty(0..1) result.


Flux, an Asynchronous Sequence of 0..N Items

The following image shows how a `Flux` transforms items:

Flux<T> is a standard Publisher<T> that represents an asynchronous sequence of 0 to N emitted items, optionally terminated by either a completion signal or an error. As in the Reactive Streams spec, these three types of signal translate to calls to a downstream Subscriber’s onNextonComplete, and onError methods.


Mono, an Asynchronous 0-1 Result

The following image shows how a `Mono` transforms an item:

Mono<T> is a specialized Publisher<T> that emits at most one item _via_ the onNext signal then terminates with an onComplete signal (successful Mono, with or without value), or only emits a single onError signal (failed Mono).


Simple ways to create Mono and Flux

  • Mono.empty() or Flux.empty() provide an empty Mono or Flux objects respectively.

  • Mono.just(T) or Flux.just(T ...data) provide a mono of T object or Flux of T object respectively.

You can find following options to create Mono or Flux from either Publisher, Supplier, Stream, Array or Iterables.

Mono.from*
Mono.from*

Flux.from*
Flux.from*

Handling Errors

These items below will allow you to easily identify the scenario and which operator to use.

  • I want to create an erroring sequence: error (Flux|Mono)

  • to replace the completion of a successful Flux.concat(Flux.error(e))

  • to replace the emission of a successful Mono): .then(Mono.error(e))

  • if too much time elapses between onNextstimeout (Flux|Mono)

  • lazily: error(Supplier<Throwable>) (Flux|Mono)

  • I want the try/catch equivalent of:

  • throwing: error (Flux|Mono)

  • catching an exception:

  • and falling back to a default value: onErrorReturn (Flux|Mono)

  • and falling back to another Flux or MonoonErrorResume (Flux|Mono)

  • and wrapping and re-throwing: .onErrorMap(t → new RuntimeException(t)) (Flux|Mono)

  • the finally block: doFinally (Flux|Mono)

  • the using pattern from Java 7: using (Flux|Mono) factory method

  • I want to recover from errors

  • by falling back:

  • to a value: onErrorReturn (Flux|Mono)

  • to a Publisher or Mono, possibly different ones depending on the error: Flux#onErrorResume and Mono#onErrorResume

  • by retrying

  • with a simple policy (max number of attempts): retry() (Flux|Mono), retry(long) (Flux|Mono)

  • triggered by a companion control Flux: retryWhen (Flux|Mono)

  • using a standard backoff strategy (exponential backoff with jitter): retryWhen(Retry.backoff(…​)) (Flux|Mono) (see also other factory methods in Retry)

  • I want to deal with backpressure "errors" (request max from upstream and apply the strategy when downstream does not produce enough request)

  • by throwing a special IllegalStateExceptionFlux#onBackpressureError

  • by dropping excess values: Flux#onBackpressureDrop

  • except the last one seen: Flux#onBackpressureLatest

  • by buffering excess values (bounded or unbounded): Flux#onBackpressureBuffer

  • and applying a strategy when bounded buffer also overflows: Flux#onBackpressureBuffer with a BufferOverflowStrategy

Refer to the operators needed here


References: https://projectreactor.io/docs/core/release/reference


I hope this will help in understanding and getting a brief idea about the Reactive programming. Most of the content is taken from the official documentation with some explanation here and there. More on the reactive programming with the some examples will be covered in the upcoming posts.

You can find my Github profile. Please do suggest more content topics of your choice and share your feedback. Also subscribe and appreciate the blog if you like it.

347 views0 comments

Recent Posts

See All