In the first post we’ve learned about Reactive Streams specification and the next logical thing would be to take a look at one of its implementations. In our case, this will be Project Reactor as it comes bundled with Spring Framework which will be useful in the upcoming posts. We’ve seen basic types that Project Reactor library offers and in this post we will go through them in more detail.

But, before we dive into it, we have to learn one important concept that will follow us throughout reactive documentation - marble diagrams.

Marble diagrams

Marble diagrams are (interactive) diagrams that explain how a reactive operator works. We’ll get to operators later, but for now, it’s enough to think of them as a simple processing step similar to map or filter.

Mono marble diagram

Independently of operator complexity or stream size (1 or more elements), every marble diagram has 5 components: input stream, input stream’s terminating operation, operator definition, output stream, and output stream’s termination operation. Have in mind that these 5 components are not standardized, it’s just a way I like to break down marbles diagrams to make them easier to explain.

The input stream is our current stream of data (like data from a database, external service, queue, etc.) and it’s represented with a green circle in the picture above. That stream will terminate either successfully or it will error. In the picture above, the vertical line at the end of the input stream means stream ended successfully (onComplete or onSuccess were called). The operator definition is different for each operator, but it will usually have either lambda written in the box or some shapes to better visualize how input is transformed to an output. The output stream is our input stream transformed using an operator. Depending on the operator it can have the same (map) or different number (filter) of elements than input stream, but, same as an input stream, it can either finish successfully or it can error. In the picture above, the output stream will error and that is represented with red X. In case of success, there would also be a circle as for the input stream.

The picture above (and all of the others in the post) are taken from the Project Reactor’s documentation. Some other sources will have a bit different notation, but, in general, you’ll always be able to spot those 5 components. For example, there a great page called RxMarbles which has interactive diagrams for some operators and it uses diagrams that look a bit different, but they still contain all 5 mentioned components. Don’t let the page title (RxJS Marbles) scare you - as I said before, the operator’s implementation is mostly the same across all Reactive Streams implementations and that page can be used to get a better feeling of how some operators work.

Mono

Mono<T> is one of the 3 implementations of Publisher<T> interface and it’s the simplest one. We usually talk about streams of data, but Mono<T> is actually not a stream in a conventional way since it will produce 0 or 1 item and complete after that (of course, it can still terminate with an error). Because of that, most often we use Mono<T> when fetching a particular resource from somewhere or if we want to check cache. Of course, we can put in whichever type we want, even a list if we don’t care about processing each particular item.

Simple creation and consumption of a Mono

Mono<T> class has a bunch of from* factory methods that let us create it given some lambda expression, but there is also the simplest method for creating it called just.

Mono.just(42)
Mono.just("Testing Mono")
Mono.just(listOf(1,2,3))

These three lines will only create three Mono<T> objects: Mono<Int>, Mono<String> and Mono<List<Int>>, but nothing will happen if we try to print out any of the created objects. In fact, even if we add 100 operators to any of it, nothing will happen. Stream processing starts only when someone subscribes to the stream using subscribe(subscriber: Subscriber<T>) method and anything before that call is like a template for whatever we want to do with our stream. Because of that, we say that reactive streams are lazy as no computation will happen before someone is ready to receive a result.

So, to consume streams created above, we have to subscribe to them:

Mono.just(42)
    .subscribe { println(it) }

Mono.just("Testing Mono")
    .subscribe { println(it) }

Mono.just(listOf(1,2,3))
    .subscribe { println(it) }

Which then gives us an output that looks like this:

42
Testing Mono
[1, 2, 3]

There’s one more thing that might not be that obvious from the examples above and can really cause big issues in the production if not handled properly - any code that is passed to just method will execute immediately. Let’s imagine a scenario where we are using a blocking library in our reactive code (even if it’s not the best idea to do that, but more on that in some other post):

fun oldLibraryBlockingCall(): String {
    Thread.sleep(1000)
    return "Old library string"
}

The code above simulates some work by sleeping for a second and then it returns a result. With current current knowledge, you might be tempted to write something like Mono.just(oldLibraryBlockingCall()) and subscribe to that at some point, but this is not a good idea. To see what’s actually happening, we can simply put that line in the main method without subscribing to it and add some logging to a method definition:

fun main() {
    Mono.just(oldLibraryBlockingCall())
}

fun oldLibraryBlockingCall(): String {
    println("Calling blocking method")
    Thread.sleep(1000)
    return "Old library string"
}

If we execute the given code, we can see that the logging statement is printed out which is not what we want because there are no subscribers yet. To prevent this from happening, usually, we use fromSupplier factory method which, as the name suggests, accepts a Supplier<T> implementation whose implementation will call our old library method and that call will actually happen only if someone subscribes to that Mono<T>.

So, if we update our main method:

fun main() {
    Mono.fromSupplier { oldLibraryBlockingCall() }
}

we will see that nothing is printed out, but if we subscribe to it, log statements will be printed as well as the string that method returns:

fun main() {
    Mono.fromSupplier { oldLibraryBlockingCall() }
        .subscribe { println(it) }
}

Defer

There’s one nice little factory method called defer which does the same thing as fromSupplier (it defers execution until subscription), but it’s used when the return type is also Mono<T>. So, if we change oldLibraryBlockingCall() so that it returns a Mono<String> then we’d have to use Mono.defer { oldLibraryBlockingCall() } to get the same result.

fun main() {
    Mono.defer { oldLibraryBlockingCall() }
}

fun oldLibraryBlockingCall(): Mono<String> {
    println("Calling blocking method")
    Thread.sleep(1000)
    return Mono.just("Old library string")
}

Create

create method gives us full control on Mono<T> creation and provides a methods to manually push data to it. This is especially useful if we have asynchronus API that you want to use in the reactive code. Let’s image there’s an asynchronous method that accepts a callback:

fun executeRequest(callback: Callback<String>) {
    if (Random.nextBoolean()) {
        callback.success("Async success")
    } else {
        callback.error(Throwable("Async error"))
    }
}

interface Callback<T> {
    fun success(): T
    fun error(): Throwable
}

We can then wrap call to executeRequest method with a create factory method. This will give us MonoSink<T> object which has methods success and error that we can call inside callback:

val mono = Mono.create<String> { sink ->
    executeRequest(object : Callback<String> {
        override fun success(data: String) {
            sink.success(data)
        }

        override fun error(t: Throwable) {
            sink.error(t)
        }
    })
}
mono.subscribe { println(it) }

Calling success on MonoSink<T> will actually trigger onNext and onComplete callbacks which will terminate a stream.

Subscriber

In the beginning, we saw a simple subscriber that just prints whatever stream produces - it can be either a result or an error. The problem with this is that Specification provides a lot more information that can help us so let’s dive into that. The simplest implementation of a Subsriber<T> that just prints out every event with provided data would look something like this:

val mono = Mono.just(10)
mono.subscribe(object : Subscriber<Int> {
    override fun onComplete() {
        println("onComplete")
    }

    override fun onSubscribe(s: Subscription?) {
        println("onSubscribe")
    }

    override fun onNext(t: Int?) {
        println("onNext $t")
    }

    override fun onError(t: Throwable?) {
        println("onError $t")
    }
})

But, if we run this, we should see only onSubscribe log and we know Mono<T> should either succeed (onNext called once followed by onComplete) or error (only onError called once). Why? To answer this, we have to check out Subscription interface defined in the Specification - it has a method to request data. This means we have to save a reference to the Subscription in the onSubscribe method and call request method on it:

var subscription: Subscription? = null
val mono = Mono.just(10)
mono.subscribe(object : Subscriber<Int> {
    override fun onComplete() {
        println("onComplete")
    }

    override fun onSubscribe(s: Subscription?) {
        subscription = s
        println("onSubscribe")
    }

    override fun onNext(t: Int?) {
        println("onNext $t")
    }

    override fun onError(t: Throwable?) {
        println("onError $t")
    }
})
subscription?.request(1)

This now prints number 10 followed by logging that shows us stream completed successfully. Luckily, this whole process is hidden from us in Spring application so we never have to do that, but it’s always good to know what’s happening behind the scenes.

Flux

Flux<T> is another implementation of a Publisher<T> interface provided by the Project Reactor team that can produce 0 to N items. Here’s its marble diagram.

Flux marble diagram

Flux<T> is usually used when we need to process one stream item at the time or when we have an infinite stream in which case we also might need backpressure support that’s built-in it.

In my experience, Flux<T> is not used as often as Mono<T>, but I guess that can vary due to different application needs. Also, almost everything mentioned for Mono<T> can be applied to Flux<T> with an exception being the creation of it using create factory method.

Create

To see how creation of a Flux<T> is different, we can use slightly adjusted version of a callback example:

fun executeRequest(callback: Callback<List<String>>) {
    if (Random.nextBoolean()) {
        callback.success(listOf("Async success 1", "Async success 2", "Async success 3"))
    } else {
        callback.error(Throwable("Async error"))
    }
}

interface Callback<T> {
    fun success(data: T)
    fun error(t: Throwable)
}

In this case, the success callback method returns a list of items and we want to return those items as a stream of 3 items.

val flux = Flux.create<String> { sink ->
    executeRequest(object : Callback<List<String>> {
        override fun success(data: List<String>) {
            data.forEach { item ->
                sink.next(item)
            }
            sink.complete()
        }

        override fun error(t: Throwable) {
            sink.error(t)
        }
    })
}

flux.subscribe { println(it) }

As we can see, the only difference is that for each item we have to call next and, at the end of the processing, call complete. If you check out what we did for Mono<T>, there’s only one call of success method which will trigger onNext callback only once and terminate a stream by calling onComplete callback. Flux<T> will trigger exactly those callbacks, but onNext will be called multiple times. In case of an error, the stream will terminate with onError callback which is exactly the same as Mono<T>.

Backpressure

When dealing with streams of data that contain more than one item, you might need to deal with backpressure. We’ve seen what backpressure is in the previous post, but since this is one of the most important things to consider, it’s worth repeating it. Backpressure is a mechanism that can slow down a producer so that consumers can keep up with it. We can imagine a case where everything a consumer does is reading from a database while the consumer has to do some complex business logic and send data over the network. For such cases, Flux<T> has onBackpressure* methods which you can use to control its behavior in such cases.

Conclusion

In this post, we’ve seen basics of Project Reactor’s building blocks, but there’s much more to it. Because its usage is (for the most part) coupled with Spring Framework, we will dive into that and see how you can use Mono<T> or Flux<T> as request and response types in the Spring Boot application. Additionally, we will see what are the potential issues with using blocking calls in your backend service and how to avoid those issues.