전통적인 웹 방식 (스프링 MVC)

비동기-논블로킹 방식
C10K 문제
JVM 환경에서 리액티브 프로그래밍의 표준 API 사양으로비동기 데이터 스트림과 논블로킹-백프레셔(Back-Pressure)에 대한 사양을 제공
리액티브 스트림 이전의 비동기식 애플리케이션에서는 멀티 코어를 제대로 활용하기 위해 복잡한 병렬 처리 코드가 필요
처리할 데이터가 무한정 많아져서 시스템의 한계를 넘어서는 경우 애플리케이션은병목 현상(bottleneck)이 발생하거나 심각한 경우 애플리케이션이 정지되는 경우도 발생할 수 있음(논블로킹-백프레셔로 해결)
Netflix, Vmware(Pivotal), Lightbend, Red Hat과 같은 유명 회사들이 표준화에 참여 중
리액티브 스트림 인터페이스

Publisher : 데이터를 생성하고 구독자에게 통지
Subscriber : 데이터를 구독하고 통지 받은 데이터를 처리
Subscription : Publisher, Subscriber간의 데이터를 교환하도록 연결하는 역할을 하며 전달받을 데이터의 개수와 구독을 해지할 수 있다
Processor : Publisher, Subscriber을 모두 상속받은 인터페이스
리액티브 스트림에서 Publisher와 Subscriber 간의 데이터 처리 흐름

비동기 스트림 처리를 위한 표준으로써 next는 다음신호를 담고 complete는 신호가 끝난것 그리고 error은 신호보내는 도중 에러가 발생한 것을 의미한다.Publisher가 전송하면 데이터는 sequence 대로 전송한다. 그러면 Subscriber가 데이터를 수신한다.next, complete, error 신호를 발생시킨다.

스프링 에코시스템 범주에 있는 리액티브 스트림 구현체
리액티브 스트림에서 사용하는 용어와 규칙을 동일하게 사용하며 리액터만의 다양한 기능들도 제공
리액티브 스트림 사양대로 onComplete 또는 onError 시그널이 발생할 때까지 onNext를 사용해 구독자에게 데이터를 통지
Mono는 0-1개의 결과만을 처리하기 위한 Reactor 객체
Flux는 0-N개의 결과물을 처리하기 위한 Reactor 객체
기본 사용 예시
// Flux 생성
Flux<Integer> seq = Flux.just(1, 2, 3);
// 구독
seq.subscribe(System.out::println);
// 1-2-3-complete 3개의 next 신호를 발생하고 마지막에 complete 신호를 통해 시퀀스를 끝낸다
Flux.just(1, 2, 3);
// 아무런 시퀀스가 없는 경우 complete 신호만 발생 시킨다
Flux.just();
// 1-complete Mono는 발생할 수 있는 최대 값이 1개
Mono.just(1);
구독과 신호 발생
// 시퀀스는 바로 신호를 발생하지 않고 구독 시점에서 신호를 발생 시킨다
Flux.just(1, 2, 3)
.doOnNext(i -> System.out.println("호출: " + i))
.subcribe(i -> System.out.println("출력: " + i));
→ 결과
호출: 1
출력: 1
호출: 2
출력: 2
호출: 3
출력: 3
Subscriber 인터페이스 사용 방법
구독이 발생 하면 onSubscribe() 호출 다음값 요청 시 onNext() 호출 오류 발생 시 onError() 호출 요청이 끝날 시 onComplete() 호출
Flux<Integer> seq = Flux.just(1, 2, 3);
seq.subscribe(new Subscriber<>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
// 구독 시작
this.subscription = s;
this.subscription.request(1);
}
@Override
public void onNext(Integer i) {
System.out.println("Costomer가 Publisher에게 데이터 요청: " + i);
this.subscription.request(1);
}
@Override
public void onError(Throwable t) {
System.out.println("Subscriber.onError: " + t.getMessage());
}
@Override
public void onComplete() {
System.out.println("Subscriber.onComplete");
}
});
seq.subscribe() 에서 전달한 임의 Subscriber 객체를 onSubscribe() 에서 인자로 받아서 이를 필드로 저장하여 사용한다. request(1) 은 한개의 데이터를 요청한다는 뜻이다. 만약 모든 데이터를 한번에 받고 싶다면 다음과 같이 지정하면 된다
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
this.subscription.request(Long.MAX_VALUE);
}
콜드 시퀀스 / 핫 시퀀스
Cold Sequence: 구독(subscribe) 시점부터 데이터를 새로 생성함
Hot Sequence: ****구독하는 customer 와 상관 없이 데이터를 생성함. 미리 생성해 둔 데이터로 동작 함.
Flux.just() 로 생성한 시퀀스가 콜드 시퀀스이고 subscribe가 발생하지 않는다
콜드 시퀀스 예시
Flux<Integer> flux = Flux.range(1, 5).log();
flux.subscribe(i -> System.out.println("i : " + i));
flux.subscribe(i -> System.out.println("i : " + i));
→ 결과
onSubscribe()
request()
onNext(1)
i : 1
onNext(2)
i : 2
onNext(3)
i : 3
onNext(4)
i : 4
onNext(5)
i : 5
onComplete()
onSubscribe()
request()
onNext(1)
i : 1
onNext(2)
i : 2
onNext(3)
i : 3
onNext(4)
i : 4
onNext(5)
i : 5
onComplete()
핫 시퀀스 예시
Flux<Integer> flux = Flux.range(1, 5).log().cache();
flux.subscribe(i -> System.out.println("i : " + i));
flux.subscribe(i -> System.out.println("i : " + i));
→ 결과
onSubscribe()
request()
onNext(1)
i : 1
onNext(2)
i : 2
onNext(3)
i : 3
onNext(4)
i : 4
onNext(5)
i : 5
onComplete()
i : 1
i : 2
i : 3
i : 4
i : 5