Speed Up Services With Reactive API in Java EE 8

This entry is part 1 of 1 in the series Java EE 8 Microservices
  • Speed Up Services With Reactive API in Java EE 8

Services can often be optimized with asynchronous processing even without changing their behavior towards the outside world. The reason why some services aren’t efficient is that they need to wait for other services to provide a result to continue further. Let’s look how to call external REST services without waiting for them and also do multiple parallel calls independently and combine their results later with a reactive pipeline in Java EE 8.

This topic and much more will be described in more detail in my upcoming book Java EE 8 Microservices, which I co-author with Mert Çaliskan and Pavel Pscheidl

If our service calls multiple microservices and waits for each call to finish and return results before doing another call, it’s a good candidate to refactor using reactive API. In order to make the service more efficient, it could do all the calls to external services in parallel if they don’t depend on each other. This would decrease the time spent waiting and thus speed up the microservice.

In order to call REST services in parallel, we’ll use the new reactive client API in JAX-RS. We’ll combine it with the RxJava library to combine their results when available. This combination will allow us to write clean and efficient code. And with an additional benefit that the current thread can be released for further processing while waiting for results from remote calls.

We’ll build a pipeline which processes the results as they arrive and finally merges them into a single response. The first part of the pipeline will call each remote service. Instead of waiting for the results, we’ll specify what to do with each received result and continue with calling other services. Using the rx() method on the JAX-RS client request builder allows us to call a version of the get() method, which immediately returns instead of waiting for the result. In order to process results when they arrive, we can chain method handlers onto a CompletionStage returned from the rx version of the get()  method:

CompletionStage<Forecast> stage = temperatureServiceTarget .request() .rx() .get(Temperature.class) .thenApply(temperature -> new Forecast(temperature));
Code language: HTML, XML (xml)

The above code will call a temperature service and then register a lambda expression to process the resulting temperature when it arrives. This maps the temperature to a forecast object, which can be accessed with the stage  variable later.

However, we want to use another variant of the get()  method together with an RxJava Flowable Invoker from the Jersey project to get a Flowable  from RxJava instead of a CompletionStage. The Flowable interface makes it easier to combine multiple asynchronous results with much simpler code than CompletionStage and also more efficiently.

With the following code, we will call an external service and return a Flowable:

Flowable<Forecast> flowable = temperatureServiceTarget .register(RxFlowableInvokerProvider.class) .request() .rx(RxFlowableInvoker.class) .get(Temperature.class) .map(temperature -> new Forecast(temperature);
Code language: HTML, XML (xml)

We register additional RxFlowableInvokerProvider, which allows to request RxFlowableInvoker  later. This invoker then gives us the Flowable  return type from RxJava. These classes are not in the JAX-RS API and we must add them with the Jersey RxJava2 library:

<dependency> <groupId>org.glassfish.jersey.ext.rx</groupId> <artifactId>jersey-rx-client-rxjava2</artifactId> <version>2.26</version> </dependency>
Code language: HTML, XML (xml)

On the first sight it seems we made the code more complicated while doing the same thing. But a Flowable instance allows us to combine multiple calls easily:

Flowable.concat(flowable1, flowable2) .doOnNext(forecast -> { forecasts.add(forecast); }) .doOnComplete(() -> { asyncResponse.resume(Response.ok(forecasts).build()); }) .doOnError(asyncResponse::resume) .subscribe(); }
Code language: PHP (php)

For each forecast received from any flowable, we add it to a list of forecasts. Finally, we send the list of forecasts as a response or send an error response. The final call to subscribe()  is necessary to register the listeners, otherwise they would be ignored.

You may have also noticed the asyncResponse  variable used to send the final response or signal an error. This is a JAX-RS asynchronous response instance, which is used to complete a REST response at later time, when the data is available, without blocking the initial processing thread. Using the asynchronous response helps us save thread resources while waiting for results from external REST services. In order to turn on asynchronous processing in our REST endpoint, we will inject javax.ws.rs.container.AsyncResponse as the REST method argument, together with the @Suspended annotation. We will also change the return type to void because we’ll be building the response using the AsyncResponse instance:

@GET @Produces(MediaType.APPLICATION_JSON) public void getForecasts(@Suspended AsyncResponse asyncResponse) { /* ...here come some asynchronous calls to REST services... */ asyncResponse.resume(...) }
Code language: CSS (css)

Final code example

The following code will:

  • turn on asynchronous processing of REST requests in the getForecasts method
  • set 5 minute timeout on the asynchronous response
  • execute the temperature service twice, for London and Beijing, without waiting for results
  • combine the results into a sequence of forecasts
  • add every forecast in the sequence into a list
  • send the complete list when all results processed
  • send an error result in case of an exception
  • register the handlers with the subscribe method
private Flowable<Forecast> getTemperature(String location) { return temperatureTarget .register(RxFlowableInvokerProvider.class) .resolveTemplate("city", location) .request() .rx(RxFlowableInvoker.class) .get(Temperature.class) .map(temperature -> new Forecast(location, temperature)); } @GET @Produces(MediaType.APPLICATION_JSON) public void getForecasts(@Suspended AsyncResponse asyncResponse) { List<Forecast> forecasts = new ArrayList<>(); asyncResponse.setTimeout(5, TimeUnit.MINUTES); Flowable.concat(getTemperature("London"), getTemperature("Beijing")) .doOnNext(forecast -> { forecasts.add(forecast); }) .doOnComplete(() -> { asyncResponse.resume(Response.ok(forecasts).build()); }) .doOnError(asyncResponse::resume) .subscribe(); }
Code language: JavaScript (javascript)

Republished at

Leave a Reply

Your email address will not be published. Required fields are marked *

Captcha loading...