Spring 서버에서 http 요청을 보내기 위한 방법은 여러가지가 있는데 (Apache HTTP 컴포넌트 인 HttpClient / Spring Framework 에서 지원하는 RestTemplate) 기존 Spring 개발자들이 제일 많이 사용하던 RestTemplate 이 Spring 에서 공식적으로 향후 deprecated 될 예정이므로 보다 현대적인 WebClient 를 사용하라고 권장하였기 때문에 도입
리액티브 프로그래밍 에서 사용하기 위한 웹 호출 인터페이스.
멀티스레드를 지원함.
비동기 처리
리턴타입 변환 T → Mono, Flux
컨트롤러와 서비스 메소드에서의 리턴 타입을 전환하였다. 기존에 일반 타입으로 반환하였다면, Reactor는 리액티브 스트림을 지원하는 Mono와 Flux를 제공하기에 리턴 타입도 Mono, Flux로 전환되어야 한다.
리액티브 라이브러리의 경우 이미 Mono와 Flux의 리턴타입을 가질것이며, 직접 데이터를 생성하는 경우라면 just()
메서드로 감싸서 전달하도록 한다.
비동기 작업의 순서유지를 위해 flatMap 으로 체이닝
WebFlux 전환 시, 내부엔 블로킹이 존재하지 않는다. 작업들은 비동기적으로 처리될 것이며 라인 순서에 따른 실제 코드의 동작 순서는 일치하지 않게된다(메서드를 호출해도 실제 동작은 나중에 처리될것이기 때문이다). 이때 flatMap을 통해 이전 비동기 작업이 끝난 후 다음 로직들이 처리되도록 순서를 보장시켜줄 수 있다.
내부가 동기적인 동작이라면 map 으로 체이닝
flatMap과 map의 차이점은 전달하는 함수의 리턴 타입이다. flatMap에 전달하는 함수의 리턴 타입은 Mono나 Flux와 같은 리액티브 API이며, 이는 비동기 동작이 있는 함수를 전달하기 위해서이다.
하지만, 블로킹될 일이 없는 로직으로만 구성되고 데이터를 직접 생성한다면 map 함수를 통해 체이닝 할 수 있다. 그렇기에 map에 전달하는 함수는 일반적인 오브젝트(T)를 리턴 한다
@Slf4j
@RestController
public class TestController() {
private final TestService testService;
public TestController(Testservice testService) {
this.testService = testService;
}
@GetMapping("/test")
public Mono<TestDTO> test() {
log.info("START");
Mono<TestDTO> dto = testService.test();
log.info("END");
return dto;
}
}
@Service
public class TestService() {
private final WebClient webClient;
private final String baseUrl = "<http://localhost:8080>";
public TestService(WebClient.Builder webClientBuilder) {
this.webClient = webClientBuilder
.baseUrl(baseUrl)
.build();
}
public Mono<TestDTO> test() {
for (int i = 0; i < 100; i++) {
int finalI = i;
req().subscribe(driverDto ->
System.out.println("driverDto.getId() = " + driverDto.getId() + " @@@ " + finalI));
}
return webClient.get()
.uri("/test/" + 2000000118)
.exchangeToMono(response -> {
if (response.statusCode().equals(HttpStatus.OK)) {
System.out.println("DriverService.test @@@ GET");
return response.bodyToMono(DriverDto.class);
} else {
return Mono.empty();
}
});
}
private Mono<TestDTO> seq() {
return webClient.get()
.uri("/test/" + 2000000118)
.accept(MediaType.APPLICATION_JSON)
.exchangeToMono(response -> {
if (response.statusCode().equals(HttpStatus.OK)) {
return response.bodyToMono(DriverDto.class);
} else {
return Mono.empty();
}
});
}
}
→ 결과
TestService
의 for 문에 req() 실행생성
// create 메서드 사용
WebClient webClient = WebClient.create();
WebClient webClient = WebClient.create("<http://localhost:8000>");
// builder 사용
WebClient webClient = WebClient.builder()
.baseUrl("<http://localhost:8000>")
.defaultHeader("key", "value")
.build();
요청
// GET
// toMono(), toFlux() body 데이터로 받을 때 사용
Mono<User> userMono = webClient.get()
.uri("/users/{userNo}", userNo)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(User.class);
// toEntity() status, headers, body 를 포함하는 ResponseEntity 타입으로 받을 때 사용
Mono<ResponseEntity<User>> monoEntity = webClient.get()
.uri("/users/{userNo}", userNo)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.toEntity(User.class);
// POST
Mono<User> userMono = webClient.post()
.uri("/users")
.contentType(MediaType.APPLICATION_JSON)
.body(Mono.just(userDto), User.class)
.retrieve()
.bodyToMono(User.class);
// PUT
Mono<User> userMono = webClient.put()
.uri("/users/{userNo}", userNo)
.contentType(MediaType.APPLICATION_JSON)
.body(Mono.just(userDto), User.class)
.retrieve()
.bodyToMono(User.class);
// DELETE
Mono<Void> voidMono = webClient.delete()
.uri("/users/{userNo}", userNo)
.retrieve()
.bodyToMono(Void.class);
POST 나 PUT 등 body 값 설정을 위한 RequestBodySpec
인터페이스
interface RequestBodySpec extends RequestHeadersSpec<RequestBodySpec> {
RequestBodySpec contentLength(long contentLength);
RequestBodySpec contentType(MediaType contentType);
RequestHeadersSpec<?> bodyValue(Object body);
<T, P extends Publisher<T>> RequestHeadersSpec<?> body(P publisher, Class<T> elementClass);
<T, P extends Publisher<T>> RequestHeadersSpec<?> body(P publisher,
ParameterizedTypeReference<T> elementTypeRef);
RequestHeadersSpec<?> body(Object producer, Class<?> elementClass);
RequestHeadersSpec<?> body(Object producer, ParameterizedTypeReference<?> elementTypeRef);
RequestHeadersSpec<?> body(BodyInserter<?, ? super ClientHttpRequest> inserter);
}
비동기 처리 방법
Mono<ResponseDTO> responseDtoMono = webClient.get()
.uri("/api/v2/test")
.headers(httpHeaders -> httpHeaders.setAll(headerMap))
.retrieve()
.bodyToMono(ResponseDTO.class)
.block();
block()
을 이용해서 객체로 변환하면 Reactive Pipeline
을 사용하는 장점이 없어지고 모든 호출이 main 쓰레드에서 호출되기 때문에 Spring 측에서는 block()
은 테스트 용도 외에는 가급적 사용하지 말라고 권고하고 있습니다.
대신 완벽한 Reactive 호출은 아니지만 Lazy Subscribe
를 통한 Stream
또는 Iterable
로 변환 시킬 수 있는 Flux.toStream()
, Flux.toIterable()
함수를 제공하고 있습니다.
// Mono
// flux() 를 통해 Flux 로 변환 후 Optional 처리 (onError 처리 필요)
User user = webClient.get()
.uri("/users/{userNo}", userNo)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(User.class)
.flux()
.toStream()
.findFisrt()
.orElse(defaultValue);
// Flux
List<User> users = webClient.get()
.uri("/users")
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToFlux(User.class)
.toStream()
.collect(Collectors.toList());
retrieve, exchange
HTTP 호출 결과를 가져오는 방법으로 retrieve()
와 exchange()
가 존재한다.
exchange()
를 이용하면 세세한 컨트롤이 가능하지만 Response 에 대한 응답 body 를 소비(comsume) 하지 않으면 memory leak 이나 connection leak 가능성 때문에 Spring 에서는 retrieve()
를 권고하고 있다.
Mono<User> userMono = webClient.get()
.uri("/users" + userNo)
.accept(MediaType.APPLICATION_JSON)
.exchange()
.flatMap(response -> response.bodyToMono(User.class));
error 처리
try {
Mono<User> userMono = webClient.get()
.uri("/users/{userNo}", userNo)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.onStatus(HttpStatus::isError, error -> Mono.error(new UserException("error!")))
.bodyToMono(User.class);
} catch (UserException e) {
// handling
}
try {
Mono<User> userMono = webClient.get()
.uri("/users/{userNo}", userNo)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(User.class)
.onErrorResume(Mono::error)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(2))
.onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> new UserException(retrySignal.failure()))
} catch (UserException e) {
// handling
}
설정
memory size
Spring WebFlux 에서는 메모리 문제를 피하기 위해 codec 처리를 위한 in-memory buffer 값이 256KB 로 기본 설정되어 있다. 해당 값을 바꾸기 위한 설정
ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
.codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(10 * 1024 * 1024)) // 10MB
.build();
WebClient webClient = WebClient.builder()
.exchangeStrategies(exchangeStrategies)
.build();
spring.codec.max-in-memory-size=10MB
logging
WebClient 는 HttpClient 를 이용하는데, 기본은 Reactor Netty 이며, Jetty 등 다른 것도 사용가능
필터를 통해 request, response 를 조작하여 출력
public ExchangeFilterFunction logRequest() {
return ExchangeFilterFunction.ofRequestProcessor(request -> {
log.debug("Request {} {}", request.method().name(), request.url());
request.headers().forEach((name, values) -> values.forEach(value -> log.debug("{} : {}", name, value)));
});
return Mono.just(request);
});
}
public ExchangeFilterFunction logResponse() {
return ExchangeFilterFunction.ofResponseProcessor(response -> {
log.debug("Response {} {}", response.statusCode().value(), response.statusCode().getReasonPhrase());
response.headers().asHttpHeaders().forEach((name, values) -> values.forEach(value -> log.debug("{} : {}", name, value)));
if (response.statusCode() != null && (response.statusCode().is4xxClientError() || response.statusCode().is5xxServerError())) {
return response.bodyToMono(String.class)
.flatMap(body -> {
LOG.debug("Body is {}", body);
return Mono.just(response);
});
} else {
return Mono.just(response);
}
};
}
// WebClient 객체 생성 시 filter 에 등록
WebClient webClient = WebClient.builder()
...
.filter(logRequest())
.filter(logResponse())
...
.build();
ExchangeStrateges 와 logging level 설정
ExchangeStrategies exchangeStrategies = ExchangeStrategies.withDefaults();
exchangeStrategies
.messageWriters().stream()
.filter(LoggingCodecSupport.class::isInstance)
.forEach(writer -> ((LoggingCodecSupport)writer).setEnableLoggingRequestDetails(true));
webClient = WebClient.builder()
.exchangeStrategies(exchangeStrategies)
...
logging.level.org.springframework.web.reactive.function.client.ExchangeFunctions=DEBUG
Jetty HttpClient 이용
implementation group: 'org.eclipse.jetty', name: 'jetty-reactive-httpclient', version: '1.1.11'
@Bean
public WebClient webClient() {
SslContextFactory.Client sslContextFactory = new SslContextFactory.Client();
HttpClient httpClient = new HttpClient(sslContextFactory) {
@Override
public Request newRequest(URI uri) {
Request request = super.newRequest(uri);
// request
// method / uri
request.onRequestBegin(theRequest ->
log.info("[Request] {} {}", theRequest.getMethod(), theRequest.getURI()));
// header
request.onRequestHeaders(theRequest -> loggingHeaders(theRequest.getHeaders()));
// body
request.onRequestContent((theRequest, content) -> loggingBody(content));
// response
request.onResponseBegin(theResponse -> log.info("[Response] {}", theResponse.getStatus()));
// header
request.onResponseHeaders(theResponse -> loggingHeaders(theResponse.getHeaders()));
// body
request.onResponseContent((theResponse, content) -> loggingBody(content));
return request;
}
void loggingHeaders(HttpFields headers) {
Map<String, Object> map = new HashMap<>();
headers.forEach(httpField -> {
List<String> list = new ArrayList<>(Arrays.asList(httpField.getValues()));
map.put(httpField.getName(), list);
});
log.info("Headers : {}", map);
}
void loggingBody(ByteBuffer content) {
StringBuilder sb = new StringBuilder();
CharBuffer bodyCharBuffer = StandardCharsets.UTF_8.decode(content);
sb.append(String.format("%1.600s", bodyCharBuffer));
if (bodyCharBuffer.toString().length() > 600) {
sb.append("...");
}
log.info("Body : {}", sb);
}
};
return webClientBuilder
.clientConnector(new JettyClientHttpConnector(httpClient))
.codecs(ClientCodecConfigurer::defaultCodecs)
.build();
}
Netty HttpClient 이용
// create Netty HttpClient
HttpClient httpClient = HttpClient
.create()
.wiretap("reactor.netty.http.client.HttpClient", LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL)
Having enabled the wiretap, each request and response will be logged in full detail.
Next, we have to set the log level of Netty's client package reactor.netty.http.client to DEBUG:
logging.level.reactor.netty.http.client=DEBUG
// build WebClient
WebClient
.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build()
timeout
Response Timeout
HttpClient client = HttpClient.create()
.responseTimeout(Duration.ofSeconds(10));
WebClient webClient = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(client))
.build();
Connection Timeout
HttpClient client = HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
Read/Write Timeout
HttpClient client = HttpClient.create()
.doOnConnected(conn -> conn
.addHandler(new ReadTimeoutHandler(10, TimeUnit.SECONDS))
.addHandler(new WriteTimeoutHandler(10)));
SSL/TLS Timeout
HttpClient.create()
.secure(spec -> spec.sslContext(SslContextBuilder.forClient())
.defaultConfiguration(SslProvider.DefaultConfigurationType.TCP)
.handshakeTimeout(Duration.ofSeconds(30))
.closeNotifyFlushTimeout(Duration.ofSeconds(10))
.closeNotifyReadTimeout(Duration.ofSeconds(10)));
Proxy Timeout
HttpClient.create()
.proxy(spec -> spec.type(ProxyProvider.Proxy.HTTP)
.host("proxy")
.port(8080)
.connectTimeoutMillis(30000));
WebClient Request 시 적용하는 방법
webClient.get()
.uri("/users")
.retrieve()
.bodyToFlux(User.class)
.timeout(Duration.ofSeconds(10));