- Speed Up Services With Reactive API in Java EE 8
This article is one in a series of articles about writing microservices with Java EE 8. All these topics and much more will be described in more detail in my upcoming book Java EE 8 Microservices, which I co-author with Mert Çaliskan and Pavel Pscheidl
If our service calls multiple microservices and waits for each call to finish and return results before doing another call, it’s a good candidate to refactor using reactive API. In order to make the service more efficient, it could do all the calls to external services in parallel if they don’t depend on each other. This would decrease the time spent waiting and thus speed up the microservice.
In order to call REST services in parallel, we’ll use the new reactive client API in JAX-RS. We’ll combine it with the RxJava library to combine their results when available. This combination will allow us to write clean and efficient code. And with an additional benefit that the current thread can be released for further processing while wating for results from remote calls.
1 2 3 4 5 |
CompletionStage<Forecast> stage = temperatureServiceTarget .request() .rx() .get(Temperature.class) .thenApply(temperature -> new Forecast(temperature)); |
The above code will call a temperature service and then register a lambda expression to process the resulting temperature when it arrives. This maps the temperature to a forecast object, which can be accessed with the stage variable later.
1 2 3 4 5 6 |
Flowable<Forecast> flowable = temperatureServiceTarget .register(RxFlowableInvokerProvider.class) .request() .rx(RxFlowableInvoker.class) .get(Temperature.class) .map(temperature -> new Forecast(temperature); |
1 2 3 4 5 |
<dependency> <groupId>org.glassfish.jersey.ext.rx</groupId> <artifactId>jersey-rx-client-rxjava2</artifactId> <version>2.26</version> </dependency> |
1 2 3 4 5 6 7 8 9 10 |
Flowable.concat(flowable1, flowable2) .doOnNext(forecast -> { forecasts.add(forecast); }) .doOnComplete(() -> { asyncResponse.resume(Response.ok(forecasts).build()); }) .doOnError(asyncResponse::resume) .subscribe(); } |
For each forecast received from any flowable, we add it to a list of forecasts. Finally, we send the list of forecasts as a response or send an error response. The final call to subscribe() is necessary to register the listeners, otherwise they would be ignored.
You may have also noticed the asyncResponse variable used to send the final response or signal an error. This is a JAX-RS asynchronous response instance, which is used to complete a REST response at later time, when the data is available, without blocking the initial processing thread. Using the asynchronous response helps us save thread resources while waiting for results from external REST services. In order to turn on asynchronous processing in our REST endpoint, we will inject javax.ws.rs.container.AsyncResponse as the REST method argument, together with the @Suspended annotation. We will also change the return type to void because we’ll be building the response using the AsyncResponse instance:
1 2 3 4 5 6 |
@GET @Produces(MediaType.APPLICATION_JSON) public void getForecasts(@Suspended AsyncResponse asyncResponse) { ...here come some asynchronous calls to REST services... asyncResponse.resume(...) } |
Final code example
The following code will:
- turn on asynchronous processing of REST requests in the getForecasts method
- set 5 minute timeout on the asynchronous response
- execute the temperature service twice, for London and Beijing, without waiting for results
- combine the results into a sequence of forecasts
- add every forecast in the sequence into a list
- send the complete list when all results processed
- send an error result in case of an exception
- register the handlers with the subscribe method
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
private Flowable<Forecast> getTemperature(String location) { return temperatureTarget .register(RxFlowableInvokerProvider.class) .resolveTemplate("city", location) .request() .rx(RxFlowableInvoker.class) .get(Temperature.class) .map(temperature -> new Forecast(location, temperature)); } @GET @Produces(MediaType.APPLICATION_JSON) public void getForecasts(@Suspended AsyncResponse asyncResponse) { List<Forecast> forecasts = new ArrayList<>(); asyncResponse.setTimeout(5, TimeUnit.MINUTES); Flowable.concat(getTemperature("London"), getTemperature("Beijing")) .doOnNext(forecast -> { forecasts.add(forecast); }) .doOnComplete(() -> { asyncResponse.resume(Response.ok(forecasts).build()); }) .doOnError(asyncResponse::resume) .subscribe(); } |
Republished at