Starting from Spring 5, AsyncRestTemplate is deprecated in favour of WebClient from spring-webflux. One good thing is that you don’t have to use reactive async WebFlux in order to use WebClient. Instead, you can still use WebClient in a synchronous blocking way in Spring Web MVC so your existing projects can continue to work.
When converting one of my existing applications from AsyncRestTemplate to WebClient, I found there wasn’t sufficient documentation or examples to do a quick replacement.
After a bit of searching and learning by trial and error, here are the code snippets I came out eventually.
Here is the code gist
Reference:
- https://docs.spring.io/spring/docs/5.0.0.RELEASE/spring-framework-reference/web-reactive.html#webflux-client
- https://www.callicoder.com/spring-5-reactive-webclient-webtestclient-examples/
Create WebClient
Requirements
- Create one WebClient bean for each service that requires specific configurations.
- Config timeout, such as connection timeout, read timeout, write timeout, etc.
- Connection keep alive
- Use connection pool to reuse connections for better performance
- Config Netty worker threads to best utilize available CPU cores.
Code Snippet
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.client.reactive.ReactorResourceFactory;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
@Configuration
@Slf4j
public class WebClientConfig {
@Value("${application.webClient.connection.connectTimeoutMs:20000}")
private int connectTimeoutMs;
@Value("${application.webClient.connection.readTimeoutMs:20000}")
private int readTimeoutMs;
@Value("${application.webClient.connection.writeTimeoutMs:20000}")
private int writeTimeoutMs;
@Value("${application.webClient.connection.maxConnections:32}")
private int maxConnections;
@Value("${application.webClient.connection.maxWaitForConnectionMS:45000}")
private long maxWaitForConnectionMS;
@Value("${application.webClient.loopResourceWorkerCount:8}")
private int loopResourceWorkerCount;
/** Base web client with common configurations. * You can create your web client for each specific service using baseWebClient.mutate()...build()
*/
@Bean
@Qualifier("baseWebClient")
public WebClient baseWebClient(WebClient.Builder webClientBuilder, ReactorResourceFactory resourceFactory) {
Function<HttpClient, HttpClient> mapper =
httpClient ->
httpClient.tcpConfiguration(client ->client.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMs)
.option(ChannelOption.TCP_NODELAY, true)
.doOnConnected(conn -> conn
.addHandlerLast(new ReadTimeoutHandler(readTimeoutMs, TimeUnit.MILLISECONDS))
.addHandlerLast(new WriteTimeoutHandler(writeTimeoutMs, TimeUnit.MILLISECONDS))))
.keepAlive(true);
return webClientBuilder
.clientConnector(new ReactorClientHttpConnector(resourceFactory, mapper))
.build();
}
@Bean
@Qualifier("serviceAWebClient") // WebClient for service A. Each service who requires specific configurations should create its own web client bean.
public WebClient serviceAWebClient(@Qualifier("baseWebClient") WebClient baseWebClient) {
// Add default behaviors here
return baseWebClient.mutate()
.defaultHeader("Accept", "application/json")
.defaultHeader("Content-Type", "application/json")
.build();
}
/*
By default, HttpClient participates in the global Reactor Netty resources held in reactor.netty.http.HttpResources.
It uses max of (number of CPU cores, 4) work threads, unless override by env var "reactor.netty.ioWorkerCount", read LoopResources.java Refer to https://github.com/spring-projects/spring-framework/blob/master/src/docs/asciidoc/web/webflux-webclient.adoc
However, availableProcessors() in Java 8 would return the number of cores from the host (where the docker runs), not the docker vCPU.
Therefore, you may get a large number of cores when running on docker with OpenShift cluster.
You have 2 way to solve this:
1. use -Dreactor.ipc.netty.workerCount=8 environment variable.
2. Create a customized ReactorResourceFactory bean with worker count (not using the global netty resource). Recommended way. Refer to https://stackoverflow.com/questions/53948033/how-to-customize-netty-with-spring-boot-2-1-webflux and https://docs.spring.io/spring-boot/docs/current/reference/html/boot-features-developing-web-applications.html#boot-features-reactive-server-resources
*/
@Bean
@Qualifier("resourceFactory")
public ReactorResourceFactory resourceFactory() {
ReactorResourceFactory reactorResourceFactory = new ReactorResourceFactory();
reactorResourceFactory.setUseGlobalResources(false); // Don't use the global netty resource, since we want to explicitly control the resource.
reactorResourceFactory.setConnectionProvider(ConnectionProvider.fixed("ConnPro", maxConnections, maxWaitForConnectionMS));
reactorResourceFactory.setLoopResourcesSupplier(() -> LoopResources.create("LoopRes", loopResourceWorkerCount, true));
return reactorResourceFactory;
}
}
Configure 2-way SSL for WebClient
Requirements
- One of the service we have requires client side certificate. Essentially we need to config SslContext.
Code Snippet
import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.regex.Pattern;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManagerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.netty.http.client.HttpClient;
@Configuration
@Slf4j
public class ResourceServiceWebClientConfig {
@Value("${application.serviceB.ssl.keyStore.path}")
private Resource keyStoreResource;
@Value("${application.serviceB.ssl.keyStore.type}")
private String keyStoreType;
@Value("${application.serviceB.ssl.keyStore.password}")
private String keyStorePassword;
@Value("${application.serviceB.ssl.trustStore.path}")
private Resource trustStoreResource;
@Value("${application.serviceB.ssl.trustStore.type}")
private String trustStoreType;
@Value("${application.serviceB.ssl.trustStore.password}")
private String trustStorePassword;
... other configs same as the previous example.
@Bean
@Qualifier("serviceBWebClient")
public WebClient serviceBWebClient(final WebClient.Builder webClientBuilder)
throws UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException {
SslContext sslContext = createSslContext();
Function<HttpClient, HttpClient> mapper =
httpClient ->
httpClient.tcpConfiguration(client ->
client.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMs)
.option(ChannelOption.TCP_NODELAY, true)
.doOnConnected(conn -> conn
.addHandlerLast(new ReadTimeoutHandler(readTimeoutMs, TimeUnit.MILLISECONDS))
.addHandlerLast(new WriteTimeoutHandler(writeTimeoutMs, TimeUnit.MILLISECONDS)))
)
.keepAlive(true)
// Config SSL
.secure(sslContextSpec -> sslContextSpec.sslContext(sslContext)
);
return webClientBuilder
.clientConnector(new ReactorClientHttpConnector(resourceFactory, mapper))
.build();
}
//Refer to https://stackoverflow.com/questions/53341607/how-to-configure-a-reactive-webclient-to-use-2-way-tls
private SslContext createSslContext()
throws CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException, UnrecoverableKeyException {
final KeyStore keyStore = getStore(keyStoreResource, keyStorePassword, keyStoreType);
final KeyStore trustStore = getStore(trustStoreResource, trustStorePassword,
trustStoreType);
KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
keyManagerFactory.init(keyStore, keyStorePassword.toCharArray());
TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init(trustStore);
return SslContextBuilder.forClient()
.keyManager(keyManagerFactory)
.trustManager(trustManagerFactory)
.build();
}
private KeyStore getStore(final Resource storeResource, final String password, final String keyStoreType)
throws IOException, KeyStoreException, CertificateException, NoSuchAlgorithmException {
final KeyStore result;
final String storePath = storeResource.getURL().getPath()
.replaceAll(Pattern.quote("\\"), "/");
try (FileInputStream stream = new FileInputStream(new File(storePath))) {
final String type = keyStoreType == null ? KeyStore.getDefaultType() : keyStoreType;
result = KeyStore.getInstance(type);
result.load(stream, password.toCharArray());
}
return result;
}
}
Use WebClient
Requirements
- Log request/response only on certain exceptions
- We can configure the web client to log for every request, refer to this link, but here I need to log only on failure so support team can get the details.
- Retry on certain exceptions
Code Snippet
@Service
@Slf4j
public class SomeServiceProvider {
private final WebClient webClient;
@Value("${application.someService.url}")
private String serviceUrl;
@Value("${application.someService.header.accept}")
private String serviceHeaderAccept;
@Value("${application.someService.header.contentType}")
private String serviceHeaderContentType;
@Getter
@Value("${application.someService.maxRetries}")
private int maxRetries;
@Autowired
public SomeServiceProvider(
@Qualifier("webClient") final WebClient webClient) {
this.webClient = webClient;
}
private Mono<SomeServiceResponse> callSomeService(
SomeServiceRequest someServiceRequestBody) {
return webClient
.post()
.uri(this.serviceUrl)
.syncBody(someServiceRequestBody)
.retrieve()
.bodyToMono(SomeServiceResponse.class)
.retry(maxRetries, this::shouldRetryOnError)
// Still get error after retried
.doOnError(WebClientResponseException.class, e ->
logErrorForWebClientResponseException(someServiceRequestBody, e)
);
}
private boolean shouldRetryOnError(Throwable throwable) {
if (throwable instanceof WebClientResponseException) {
HttpStatus statusCode = ((WebClientResponseException) throwable).getStatusCode();
if (statusCode.is4xxClientError()) {
// Don't retry on 4XX client side error
return false;
}
}
return true;
}
private void logErrorForWebClientResponseException(Object requestBody,WebClientResponseException ex) {
if (log.isErrorEnabled()) {
log.error("Calling some service and got 4XX or 5XX error. "
+ "Request URL {} "
+ "Request Body {} "
+ "Response Code {} "
+ "Response Body {}",
this.serviceUrl,
JsonUtils.object2JsonOrFallback2ToString(requestBody),
ex.getStatusCode(),
ex.getResponseBodyAsString(),
ex);
}
}
}
This article was originally published on https://coddicting.wordpress.com/.
Comments
Post a Comment
All comments are welcome.