Updated at 25 Feb 2021
Since creation of this blog post, there have been some changes in the Spring Data R2DBC library that’s used for accessing a database. Most recent version of code can be found here and I’ll do my best to keep this branch up-to-date.
Overview
In the second post, we’ve talked about the basics of Project Reactor and now it’s the time to apply this in practice. Instead of creating a Spring WebFlux application, we’ve will see how to migrate a small Spring MVC application to a Spring WebFlux one. This will help us see how seamless migration is, what are Spring Webflux counterparts to a well-known Spring MVC components, and, by the end of this blog post, we will have fully functional reactive application.
Motivation
Before Spring WebFlux came in version 5.0, the only option was the classic Spring Web MVC framework built on top of Servlet API. This framework is, to this day, the most popular choice for new Spring projects and it works like a charm, but reactive, non-blocking programming model is gaining traction in recent years. The idea behind it is to reduce time application spends in blocking state waiting for data to arrive (from a database, another service, message queue, etc.) which could make an application faster as shown here and here. So, next time you are creating a new Spring project, think about making it reactive or, if you have a chance, migrate an existing service.
Now, you might be wondering: “Should I then migrate every service I have to this new reactive model?” and, of course, the answer is no. Blindly migrating existing codebases to a new programming model takes time, opens the door for new bugs to creep in and it might even cause a decrease in performance if not done properly. If your service if idling most of the time or is not under the high load, simple, blocking model will work just fine. Also, existing blocking libraries (that are still prevalent) will work without any additional adjustments. However, if you have some part of the system that is under the high load and uses a lot of threading to make things faster, switching to a reactive model might be a good idea.
Luckily, switching to it from the blocking model is simple when using a Spring Boot framework thanks to the efforts of the Spring team. The following sections contain a step-by-step guide on how to do a migration. Each step will have a link to a corresponding project’s branch on Github.
Spring Web MVC application
For start, let’s go to https://start.spring.io/ and create a simple Spring application with only one dependency - spring-boot-starter-web
. This will give us everything we need to create an endpoint that will, for the start, return a hardcoded list of programming languages. As a side-note, I’ll be using Kotlin in my examples, but everything can be done in Java as well.
@RequestMapping("/languages")
@RestController
class LanguagesController {
@GetMapping
fun getLanguages(): List<Language> {
return languages
}
}
data class Language(
val name: String
)
val languages = listOf(
Language("Kotlin"),
Language("Java"),
Language("Go"),
Language("Python")
)
The next step is to move a hardcoded list of languages to a database and use some existing Spring primitives to fetch the list from there. If our case, we will use PostgreSQL database and JdbcTemplate
, but you can use whatever database and (blocking) drivers you want. For this, we need 2 new dependencies, a few Spring properties and a running database (how to run a database and fill it with data is described in the project’s README):
implementation("org.springframework.boot:spring-boot-starter-data-jdbc")
runtimeOnly("org.postgresql:postgresql")
spring.datasource.url=jdbc:postgresql://localhost:5432/webflux-demo-database
spring.datasource.username=postgres
spring.datasource.password=postgres
After this update, code looks like this (hardcoded list of languages is gone):
@RequestMapping("/languages")
@RestController
class LanguagesController(
private val jdbcTemplate: JdbcTemplate
) {
@GetMapping
fun getLanguages(): List<Language> {
return jdbcTemplate.query("select * from languages") { rs: ResultSet, _: Int ->
Language(name = rs.getString("name"))
};
}
}
data class Language(
val name: String
)
The next step will be an introduction of an external service that will return a year when given language was published. For mocking an external service, we’ll be using Mocky and for calling it we’ll be using old, faithful RestTemplate
. Another side-note: routes that are shown below might be expired in which case, here’s a response those endpoints should return:
{
"year": 1945
}
With mocks in place, we can add them to our existing code and make use of them:
val urls = mapOf(
"Kotlin" to "https://run.mocky.io/v3/6654273e-456d-40ce-9209-c879c93a844d",
"Java" to "https://run.mocky.io/v3/f04c90fc-7529-4f0b-a9d7-90e92105d5bf",
"Go" to "https://run.mocky.io/v3/2eece29e-29d8-4d59-aa81-9d47886f2e17",
"Python" to "https://run.mocky.io/v3/72bbad6e-2127-4f8a-a9c1-2fdf15b0c18c"
)
@RequestMapping("/languages")
@RestController
class LanguagesController(
private val jdbcTemplate: JdbcTemplate,
private val restTemplate: RestTemplate
) {
@GetMapping
fun getLanguages(): List<Language> {
val languages = jdbcTemplate.query("select * from languages") { rs: ResultSet, _: Int ->
Language(name = rs.getString("name"))
};
return languages.map { language ->
val languageYear = restTemplate.getForEntity(urls[language.name].toString(), LanguageYear::class.java)
language.copy(year = languageYear.body?.year)
}
}
}
data class Language(
val name: String,
val year: Int? = null
)
data class LanguageYear(
val year: Int
)
@Configuration(proxyBeanMethods = false)
class RestTemplateConfiguration {
@Bean
fun restTemplate(restTemplateBuilder: RestTemplateBuilder): RestTemplate {
return restTemplateBuilder.build()
}
}
At this point, we have everything we need to show how to migrate this blocking Spring application to a reactive, non-blocking stack.
Code for this step is available here.
Endpoint migration
The simplest thing you can do to start the migration is to change the return type of the endpoint to a reactive primitive. Before that, we need to add Spring WebFlux dependency that will bring Project Reactor library to our project:
implementation("org.springframework.boot:spring-boot-starter-webflux")
In our case, we are returning a List<Language>
so we need to change that to a Flux<Language>
and use one of the factory methods to create a Flux
from a List
.
@GetMapping
fun getLanguages(): Flux<Language> {
val languages = jdbcTemplate.query("select * from languages") { rs: ResultSet, _: Int ->
Language(name = rs.getString("name"))
};
return Flux.fromIterable(
languages.map { language ->
val languageYear = restTemplate.getForEntity(urls[language.name].toString(), LanguageYear::class.java)
language.copy(year = languageYear.body?.year)
}
)
}
The problem with this is that it won’t help at all with the application’s performance - we still have blocking calls to both database and an external service. Still, it’s nice to be able to introduce reactive primitives as return types at the top level and still have a fully functioning application. Also, you can use rich operators API that Project Reactor offers instead of a pretty poor streaming API that Java offers.
One important thing to notice here is that we don’t have to call subscribe
at any point - Spring does that for us. This will be true in most cases - as long as every piece of execution bubbles up to the controller layer, Spring will automatically subscribe to our chain. Probably the most common example where subscription needs to be done manually are message queues - when the message comes, we need to perform some action that is not in any way connected with our interface layer.
Also, if we check out logging when the application starts, we can see application still running on the Tomcat server while a fully reactive application will run, by default, on Netty. For now, this is not that relevant, but it will be when we complete migration.
Code for this step is available here.
RestTemplate migration
Migrating calls to external service(s) is probably the best way to start the migration to a reactive stack. Those calls are usually the ones that take longer than anything else and it makes sense to make use of a non-blocking execution for it. At this stage, we will revert change from the previous section and pretend it never happened. In the section after this, we will combine the two.
To do this, we will remove RestTemplate
and replace it with its reactive counterpart - WebClient
.
@RequestMapping("/languages")
@RestController
class LanguagesController(
private val jdbcTemplate: JdbcTemplate,
private val webClient: WebClient
) {
@GetMapping
fun getLanguages(): List<Language> {
val languages = jdbcTemplate.query("select * from languages") { rs: ResultSet, _: Int ->
Language(name = rs.getString("name"))
};
return languages.map { language ->
val languageYear = webClient
.get()
.uri(urls[language.name].toString())
.retrieve()
.bodyToMono(LanguageYear::class.java)
.block()
language.copy(year = languageYear?.year)
}
}
}
@Configuration(proxyBeanMethods = false)
class WebClientConfiguration {
@Bean
fun webClient(): WebClient {
return WebClient.create()
}
}
Notice that we still have to call block()
method to retrieve data from the endpoint meaning we are still not non-blocking, but we are getting there :)
Code for this step is available here.
RestTemplate + endpoint migration
Let’s combine 2 previous sections and migrate everything except database call to a reactive stack.
@GetMapping
fun getLanguages(): Flux<Language> {
val languages = jdbcTemplate.query("select * from languages") { rs: ResultSet, _: Int ->
Language(name = rs.getString("name"))
};
return Flux.fromIterable(languages)
.flatMap { language ->
// fetch published year for each language
val languageYearResponse = webClient
.get()
.uri(urls[language.name].toString())
.retrieve()
.bodyToMono(LanguageYear::class.java)
// update language model with a fetched year
languageYearResponse
.map { languageYear -> language.copy(year = languageYear.year) }
}
}
@Configuration(proxyBeanMethods = false)
class WebClientConfiguration {
@Bean
fun webClient(): WebClient {
return WebClient.create()
}
}
So, we’ve replaced RestTemplate
with WebClient
, wrapped languages fetch from a database to a Flux
, removed block()
call of WebClient
call towards external service and made use of flatMap
operator that will, for each language, make an API call and update language model returned by the endpoint.
Code for this step is available here.
Database
Migrating database calls to a reactive stack is the trickiest of all migrations so far. The biggest problem is that JDBC is a blocking driver that does not fit very well to a reactive (non-blocking) model and, to solve that problem, we need a reactive database driver. The good news is that there’s is one - R2DBC and Spring supports it with API similar to the one we used for JDBC, but also provides fluent API that still lacks more advanced features (especially if you are using something like JOOQ). The bad news is that this technology is still really young and, although it is production-ready, most people are not willing to take a risk (which is perfectly reasonable). Also, if you are using libraries like Flyway or Liquibase to manage database transactions, they don’t support it (at least not at the time of writing this blog post) so you have to configure both JDBC and R2DBC connections.
Is there a way around this? Is there a way to use existing, blocking database drivers and libraries but still benefit from the reactive model? Well, there is, but it’s a bit controversial. Project Reactor ships with different types of schedulers which enable us to move execution to a background thread and free our main thread. It’s a bit more convoluted than this, but we’ll get back to that later. First, we will see how to migrate a database call to use reactive database drivers.
R2DBC
To include all the necessary dependencies for R2DBC to work, you should replace:
implementation("org.springframework.boot:spring-boot-starter-data-jdbc")
runtimeOnly("org.postgresql:postgresql")
with:
implementation("org.springframework.boot:spring-boot-starter-data-r2dbc")
runtimeOnly("io.r2dbc:r2dbc-postgresql")
and for application properties replace:
spring.datasource.url=jdbc:postgresql://localhost:5432/webflux-demo-database
spring.datasource.username=postgres
spring.datasource.password=postgres
with:
spring.r2dbc.url=r2dbc:postgresql://localhost:5432/webflux-demo-database
spring.r2dbc.username=postgres
spring.r2dbc.password=postgres
This will give you an autoconfigured instance of DatabaseClient
that you can inject into your controller and use to fetch data from a database. After this change code looks like this:
@RequestMapping("/languages")
@RestController
class LanguagesController(
private val webClient: WebClient,
private val databaseClient: DatabaseClient
) {
@GetMapping
fun getLanguages(): Flux<Language> {
return databaseClient.select().from("languages").`as`(Language::class.java).fetch().all()
.flatMap { language ->
// fetch published year for each language
val languageYearResponse = webClient
.get()
.uri(urls[language.name].toString())
.retrieve()
.bodyToMono(LanguageYear::class.java)
// update language model with a fetched year
languageYearResponse
.map { languageYear -> language.copy(year = languageYear.year) }
}
}
}
The only other change is the way we are fetching languages from the database. We are using fluent API that fetches all the data from the table languages
and converts that to Language
object. Everything after that line is the same as before.
If you play around with this API, you’ll notice that there’s a variant of the from
method that can accept class directly, but that is not possible in our case since the class name is Language
(singular) and table name is languages
(plural). Generally, if you stick with the mapping rules of this fluent API, you can use it for all simple queries that you might need, but API also supports raw queries for maximum flexibility and control.
Code for this step is here.
JDBC with schedulers
Before we start the migration of the blocking JDBC to a non-blocking version, we have to talk about the downsides of this approach and why it is not something you might want to do. As we saw before, using the reactive programming model enables an application to process more requests in parallel by simply delegating work to other components instead of blocking your app. Interestingly enough, this is achieved by running only a few threads in parallel and that number is equal to the number of cores of your CPU. So, when you include Spring Webflux starter (without Spring MVC starter), you get a number of CPU threads that are run by the Netty server in an event loop. When a request (event) comes, it gets dispatched to some other component (WebClient
, DatabaseClient
, reactive MQ client, etc.) and a thread that processed an event is free to handle a new event. When data is ready, one of the threads will take it and dispatch it further (maybe to some other component, maybe to the caller). For comparison, in the standard Spring MVC application you, by default, have Tomcat server that is running 200 thread in parallel and each gets blocked by a new request (if the execution is not manually moved to another thread). This might seem a lot, but image having a system with thousands of requests per second and you can assume your application will start throttling fast.
Now that we understand Netty’s threading model a bit better, we can kinda guess what might be the problem of using anything blocking in the Spring WebFlux application. With only a few threads available, we have to be extremely careful not to block any of it as that could be disastrous for our application. Luckily, there’s a way to avoid that (to some extent).
Schedulers are a powerful concept that enables you to switch execution thread whenever you want to whichever thread you want. Even better, this is done by using a familiar concept of an operator, specifically subscribeOn
, which accepts an instance of a Scheduler
interface. Project Reactor offers quite a few of them out of the box, but I’ll focus on boundedElastic
scheduler in this post. The idea behind it is simple - create a bounded pool of ExecutorService
workers on demand, reuse them if they are free, or simply remove them if they have not been used for some time (60 seconds by default). An important thing to notice here is that pool is bounded. There’s also elastic
scheduler available that can create an unbounded pool of threads, but this can cause headaches if an application starts uncontrollably creating new threads.
So, to recap - we are using blocking API in an application that has only a few available threads at any given time and we are using a bounded pool of background threads to prevent blocking of main threads. How can we be sure that we will not run out of background threads? Well, we cannot be. boundedElastic
scheduler, by default, has a limit of 10x the number of CPU cores threads that it will create before crashing and each thread has a queue of 100 000 tasks that can be scheduled to run on it. This is a pretty huge number of tasks that we can start, right? Again, it depends on the application load. If you are sure this limit will not be reached, I’d say using this approach with blocking API is fine and will work fine. Sure, you are not getting 100% percent out of the whole reactive model, but it’s still something. In the end, if you decide to use R2DBC at any point, you’ll only need to update code in the repository layer.
This should be enough details about the internals of schedulers and it’s time to look at how can we do that in code. For this, we will be using a version of the application before migration to R2DBC since we want to have blocking JDBC driver in place. So, our code that fetches languages from a database looks like this:
val languages = jdbcTemplate.query("select * from languages") { rs: ResultSet, _: Int ->
Language(name = rs.getString("name"))
};
In the previous article, we’ve learned that wrapping a blocking piece of code in Mono.just
or Flux.fromIterable
will cause an expression to be executed immediately and that a better approach is to use something like Mono.fromSupplier
which is exactly what we need in this case.
val languages = Mono.fromSupplier {
jdbcTemplate.query("select * from languages") { rs: ResultSet, _: Int ->
Language(name = rs.getString("name"))
};
}
The problem is that this now breaks rest of the code that goes to an external service to fetch more data about each language. This is because JdbcTemplate
is returning a list which means we have a Mono<List<Language>>
, but we need Flux<Language>
. One simple trick here is to use flatMapIterable
operator that will do that conversion conversion:
return languages
.flatMapIterable { it }
.flatMap { language ->
// fetch published year for each language
val languageYearResponse = webClient
.get()
.uri(urls[language.name].toString())
.retrieve()
.bodyToMono(LanguageYear::class.java)
// update language model with a fetched year
languageYearResponse
.map { languageYear -> language.copy(year = languageYear.year) }
}
It might look a bit magical, but it’s really simple - mapper function returns an Iterable
type which is then processed so that it returns one item at the time which basically flattens that Iterable
(therefore, the name :)).
But we still have an issue of the blocking call that’s executed on the main thread. To fix this, we use described boundedElastic
scheduler:
return languages
.flatMapIterable { it }
.flatMap { language ->
// fetch published year for each language
val languageYearResponse = webClient
.get()
.uri(urls[language.name].toString())
.retrieve()
.bodyToMono(LanguageYear::class.java)
// update language model with a fetched year
languageYearResponse
.map { languageYear -> language.copy(year = languageYear.year) }
}
.subscribeOn(Schedulers.boundedElastic())
It doesn’t matter where you put subscribeOn
operator in the chain, so it’s usually at the top or the bottom of the chain. If you check out thread name in this case, you’ll see that it is not the same one that’s used for other parts of the chain which is exactly what we want. If you are wondering why you don’t have to worry about data availability and synchronization, you can thank the creators of Project Reactor.
Code for this step is here.
Final step
We’ve finally migrated our whole application a reactive stack (even if we cheated a bit with JDBC and scheduler). At this moment, you can completely remove Spring Web starter dependency and leave only WebFlux one which will cause Netty server to start by default and this will mark the end of the migration :)
Code for this step is here.
Conclusion
Spring team did an amazing job to make migration as seamless as possible. Reactive APIs in Spring are a norm so every part of the framework is covered, but have in mind that this is still not something that is battle-tested and proven as much as a standard Spring MVC. In the community that is slow and usually lives a few years in the past (just look at how many people are still using Java 8), this might be a dealbreaker and it might be hard to adopt it in general, but in the world of smaller services that are under a high load, it might pay of. Even if it doesn’t, it’s a smaller piece of code so it can easily be converted to Spring MVC.
In the next article, will see how to test code that we wrote in this article and how can we replicate production environment in our tests.