수년 전까지 대규모 어플리케이션은 수십 대의 서버, GB의 데이터, 수초의 응답시간 등 당연하게 여겨졌던 몇 시간의 유지보수 등의 특징이 있음. (이전에는 몇초정도 걸리는 응답시간이 당연했지만, 요새는 수초의 응답이 걸리는 페이지는 그냥 닫아버린다…) 하지만 오늘날은 다음과 같은 변화를 이유로 리액티브 프로그래밍 패러다임의 중요성이 점차 커지고 있다.
- 빅데이터 : 보통 빅데이터는 페타바이트(Petabyte, PB, 10 bytes) 단위로 구성되며 매일 증가한다.
- 다양한 환경 : 모바일 디바이스에서 수천 개의 멀티 코어 프로세서로 실행되는 클라우드 기반 클러스터에 이르기 까지 다양한 환경에 애플리케이션이 배포된다.
- 사용 패턴 : 사용자는 1년 내내 항상 서비스를 이용할 수 있으며 밀리초 단위의 응답 시간을 기대한다.
특히 모바일이 인터넷 트래픽을 가장 많이 일으키는 디바이스인 요즘은 이런 양상이 더욱 두드러지고 있으며, 미래에는 더 심화될 것이다. 리액티브 프로그래밍에서는 다양한 시스템과 소스에서 들어오는 데이터 항목 스트림을 비동기적으로 처리하고 합침을 통해 이러한 문제를 해결한다.
왜 좋을까??
우아한 형제들 기술 블로그에서는 간단한 예제로 사용 이유를 말하고 있다!
여러 API를 취합해서 전달해야하는 시스템에서는 SUM([각 API들의 경과시간]) 만큼 필요합니다.
반대로 리액티브로 진행할 경우, 여러 API 중 MAX([각 API들의 경과시간]) 이 필요합니다.
이전에 사용하던 것 처럼 sync 하고 blocking 하게 요청을 한다고 하자. API 요청이 n개가 있으면 이를 하나 호출하고 받을 때까지 다른일을 할 수 없기때문에 n개의 요청 시간을 모두 합친만큼 걸린다. 하지만 리액티브 시스템을 사용하면 요청을 비동기적으로 동시에 보내기때문에 이 중 가장 긴 요청 시간만큼만 걸린다.
17.1) 리액티브 매니패스토
리액티브 매니패스토는 리액티브 애플리케이션과 시스템 개발의 핵심 원칙에 대하여 정의한다.
리액티브 시스템 핵심 원칙
- 반응성(Responsive) : 빠를 뿐만 아니라, 일정하고 예상 가능한 즉각적인 반응 시간을 제공한다.
- 탄력성(Resilient) : 장애가 발생해도 시스템은 반응해야 한다. 여러 컴포넌트의 시간과 공간의 분리, 작업 위임 시 비동기적으로 위임 등의 기법이 있다.
- 유연성(Elastic) : 작업량이 변화하더라도 자동으로 관련 컴포넌트의 자원수를 조절하여 응답성을 유지한다
- 메시지 구동(Message-driven) : 비동기 메시지를 전달해 컴포넌트 끼리 느슨하게 통신한다.
애플리케이션 수준의 리액티브
애플리케이션 수준의 리액티브 프로그래밍의 기능은 주로 비동기로 작업을 수행한다는 것이다.
async / sync
동기(sync) 프로그래밍이란, 순서대로 1개의 처리가 끝난 뒤 다음 처리가 이루어지는 것이고, 비동기(async) 프로그래밍이란 동시에 처리가 시작되어, 순서에 상관없이 완료되는 방식이다.
리액티브 프레임워크와 라이브러리는 쓰레드를 퓨처, 액터, 일련의 콜백을 발생시키는 이벤트 루프등과 공유하고 처리할 이벤트를 변환하고 관리한다. 이 기술은 쓰레드보다 가볍다!
이벤트 스트림을 블록하지 않고 비동기로 처리하는 것은 최신 멀티코어 CPU (사실 요즘은 트리플 코어, 쿼드 코어 등 더 많이 나오기도 함)의 사용률을 극대화 할 수 있는 방법이다. 또한 개발자에게도 비동기 어플리케이션 구현의 추상 수준을 높일 수 있으므로 동기 블록, 경쟁 조건, 데드락 같은 저 수준의 멀티스레드 문제를 직접 처리할 필요가 없어진다.
경쟁 조건 : 공유 자원에 대해 여러 개의 프로세스가 동시에 접근을 시도할 때 접근의 타이밍이나 순서 등이 결과값에 영향을 줄 수 있는 상태
데드락 : 두 개 이상의 작업이 서로 상대방의 작업이 끝나기 만을 기다리고 있기 때문에 결과적으로 아무것도 완료되지 못하는 상태
시스템 수준의 리액티브
리액티브 시스템이란 여러 애플리케이션이 한 개의 일관적인, 회복할 수 있는 플랫폼을 구성할 수 있게 해주는 아키텍쳐이다. 이러한 과정에서 메세지 주도(message-driven) 이 사용된다. 각 메세지가 다른 수신자들에게 가지 않도록 이들 메세지를 비동기로 처리해야한다. 또한 시스템에서 장애가 발생하였을 때, 리액티브 시스템은 성능이 저하되는 게 아닌, 문제를 완전히 고립시켜 시스템을 복구해줄 뿐 아니라 이들 애플리케이션 중 하나가 실패해도 전체 시스템은 계속 운영될 수 있도록 도와주는 소프트웨어 아키텍쳐다.
17.2) 리액티브 스트림과 플로 API
리액티브 프로그래밍은 리액티브 스트림을 사용하는 프로그래밍이다. 리액티브 스트림은 잠재적으로 무한의 비동기 데이터를 순서대로 그리고 블록하지 않는 억압력(프로토콜에서 이벤트 스트림의 subscriber가 이벤트를 처리하는 속도 < publisher가 이벤트를 발생하는 속도 를 보장해서 문제가 발생하지 않도록 하는장치) 을 전제해 처리하는 표준 기술이다. 이러한 기술 덕분에 subscriber가 스레드를 블록하지 않고도 감당 못할만큼 데이터를 받는 일, 즉 부하를 방지한다.
넷플릭스, 레드햇, 트위터, 라이트밴드 및 기타 회사들이 참여한 리액티브 스트림 프로젝트에서 모든 리액티브 스트림 구현이 제공해야 하는 최소 기능 집합을 네 개의 관련 인터페이스로 정의했다. 자바 9의 새로운 java.util.concurrent.Flow 클래스 뿐 아니라 Akka 스트림(Lightbend), 리액터(Pivotal), RxJava(Netflix), Vert.x(Redhat) 등 많은 서드 파티 라이브러리에서 이들 인터페이스를 구현한다.
Flow 클래스
자바 9에서는 리액티브 프로그래밍을 제공하는 클래스 Flow를 추가했다. 이 클래스는 정적 컴포넌트 하나를 포함하고 있으며 인스턴스화할 수 없다. 리액티브 스트림 프로젝트 표준에 따라 프로그래밍 발행-구독 모델을 지원할 수 있도록 Flow 클래스는 중첩된 인터페이스 4개를 포함한다.
Publisher
메세지의 발행자 이다. 이 메세지들은 하나의 흐름(current)에 실려서 나가고, 같은 메세지가 같은 순서로 나가는 것을 보장한다.
@FunctionalInterface
public interface Publisher<T> {
void subscribe(Flow.Subscriber<? super T> s);
}
Subscriber
메세지의 구독자이며 프로토콜에서 정의한 순서로 지정된 메서드 호출을 통해 발행되어야 한다.
메세지들을 정상적으로 수신하면 onNext 함수로 하나하나 볼 수 있고, publisher가 메세지를 정상 발행하지 못하는 경우, onError 가 호출된다.
public interface Subscriber<T> {
void onSubscribe(Flow.Subscription s);
void onNext(T t);
void onError(Throwable t);
void onComplete();
}
- onSubscribe onNext* (onError | onComplete)? 표시는 onSubscribe 메소드가 항상 처음 호출되고 이어서 onNext 가 여러번 호출될 수 있음을 의미한다.
Subscription
subscriber(구독자)와 발행자(publisher) 사이 관계를 조절하기 위한 인터페이스이다. request는 처리할 수 있는 이벤트의 개수(current의 개수)를 전달하며, cancel은 더 이상 이벤트를 받지 않음을 통지한다.
public interface Subscription {
void request(long n);
void cancel();
}
Processor
Processor 인터페이스는 publisher과 subscriber를 상속받을 뿐, 아무 메서드도 추가하지 않는다. 주로 구독자로써의 역할을 둘다 할 수 있고, 기본적으로 버퍼를 가지고 있어, 전달받은 메세지를 가공하거나, 잠시 유지할 수 있다.
public interface Processor<T, R> extends Flow.Subscriber<T>, Flow.Publisher<R> { }
Flow 클래스가 갖는 중첩 인터페이스들의 규칙
- Publisher는 반드시 Subscription의 request 메서드에 정의된 개수 이하의 요소만 Subscriber에게 전달해야 한다. 하지만 Publisher는 지정된 개수보다 적은 수의 요소를 onNext로 전달할 수 있으며 동작이 성공적으로 끝났으면 onComplete를 호출하고 문제가 발생하면 onError를 호출해 Subscription을 종료할 수 있다.
- Subscriber는 요소를 받아 처리할 수 있음을 Publisher에게 알려야 한다. 이런 방식으로 Subscriber는 Publisher에게 역압력을 행사할 수 있고 Subscriber가 관리할 수 없이 너무 많은 요소를 받는 일을 피할 수 있다. 더욱이 onComplete나 onError 신호를 처리하는 상황에서 Subscriber는 Publisher나 Subscription의 어떤 메서드도 호출할 수 없으며 Subscription이 취소되었다고 가정해야 한다. 마지막으로 Subscriber는 Subscription.request() 메서드 호출이 없어도 언제든 종료 시그널을 받을 준비가 되어있어야 하며 Subscription.cancel()이 호출된 이후에라도 한 개 이상의 onNext를 받을 준비가 되어 있어야 한다.
- Publisher와 Subscriber는 정확하게 Suibscription을 공유해야 하며 각각이 고유한 역할을 수행해야 한다. 그러려면 onSubscribe와 onNext 메서드에서 Subscriber는 request 메서드를 동기적으로 호출할 수 있어야 한다. 표준에서는 Subscription.cancel() 메서드는 몇 번을 호출해도 한 번 호출한 것과 같은 효과를 가져야 하며, 여러 번 이 메서드를 호출해도 다른 추가 호출에 별 영향이 없도록 스레드에 안전해야 한다고 명시한다. 같은 Subscriber 객체에 다시 가입하는 것은 권장하지 않지만 이런 상황에서 예외가 발생해야 한다고 명세서가 강제하진 않는다. 이전의 취소된 가입이 영구적으로 적용되었다면 이후의 기능에 영향을 주지 않을 가능성도 있기 때문이다.
근데 놀랍게도 아직 Java에서는 구현체가 없습니다… 하핫…
17.3) 리액티브 라이브러리 RxJava 사용하기
Flow 클래스에 정의된 인터페이스는 직접 구현하도록 의도된 것은 아니지만.. 구현체가 없다 ㅠㅠ
대신 넷플릭스에서 개발한 RxJava로 리액티브 애플리케이션 관련 예제를 더 알아보자. PxJava에서 publisher은 Observable 이고, subscriber은 Observer 이다.
import java.lang.concurrent.Flow.*; // Flow API의 import
import io.reactivex.Observable; // RxJava의 import
좋은 시스템 아키텍쳐 스타일을 유지하려면 일부에 사용된 개념의 세부사항을 전체 시스템에서 보이게 하지 않아야한다. 따라서 Observable의 추가 구조가 필요한 상황에서만 Observable을 사용하고, 그렇지 않은 경우에는 Publisher 인터페이스를 사용하는 것이 더 좋다고 한다! (ArrayList, LinkedList를 List로 구현하는 것과 같은 맥락)
Observable
Observable<String> strings = Observable.just("first", "second");
Observable<Long> onePersec = Observable.interval(1, TimeUnit.SECONDS);
// 1초 간격으로 Long 값을 반환. 이때 값은 계속 증가.
Observable은 역압력을 지원하지 않으므로 Subscription의 request 메서드를 포함하지 않는다.
Observer
public interface Observer<T> {
void onSubscribe(Disposable d);
void onNext(T t);
void onError(Throwable t);
void onComplete();
}
Observer 클래스는 Observable을 구독한다. 또한 오버로드 된 기능이 많아 유연하다!
네 개의 메서드를 모두 구현해야 하는 Flow.Subscriber와 달리 onNext의 시그니처에 해당하는 람다 표현식을 전달해 Observable을 구독할 수 있다. 예를 들어 Observer를 만들 때 onNext 메서드에서 쓸 람다만 전달해도 된다.
Observable<Long> onePerSec = Observable.interval(1 ,TimeUnit.SECONDS);
// 1초 간격으로 Long 값을 반환. 이때 값은 계속 증가시키며 값을 방출하는 Observable
onePerSec.subscribe(i -> System.out.println(TempInfo.fetch("New York"));
// 람다 표현식으로 onNext 메서드만 구현하여 구독
매번 구독자가 New Tork Seoul 등 도시를 지정해서 가져와야하는 것은 아니다. 정보를 방출하는 쪽에서 파라미터를 받아, 원하는 정보를 가져오게 하면 된다.
// 1초마다 한 개의 온도를 방출하는 Observerable
public static Observable<TempInfo> getTemperature(String town) {
// Observer를 소비하는 함수로부터 Observable 만들기
return Observable.create(emitter -> Observable.interval(1, TimeUnit.SECONDS).subscribe(i -> {
if (!emitter.isDisposed()) {
if (i >= 5) { // 온도를 5번 보고했으면 Observer를 완료하고 스트림 종료
emitter.onComplete();
}
else {
try {
emitter.onNext(TempInfo.fetch(town)); // 온도 보고
}
catch (Exception e) {
emitter.onError(e);
}
}
}
}));
}
emitter는 구독은 못하는 Observable이다. (onSubscribe가 없음) 이 코드로 Observer에게 직접 TempInfo를 전달할 수 있다.
이렇게 되면 Observer에서는 다음과 같이 출력하면 된다.
// 수신한 온도를 출력하는 Observer
public class TempObserver implements Observer<TempInfo> {
@Override
public void onComplete() {
System.out.println("Done!");
}
@Override
public void onError(Throwable throwable) {
System.out.println("Got problem: " + throwable.getMessage());
}
@Override
public void onSubscribe(Disposable disposable) {}
@Override
public void onNext(TempInfo tempInfo) {
System.out.println(tempInfo);
}
}
메인에서 getTemparature 메소드가 반환하는 Observable에 Observer를 구독하는 예제이다. 이렇게 하면 다섯 번 온도를 출력하는 동안 에러가 발생하지 않고 onComplete 신고가 전송되게 된다.
public static void main(String[] args) {
// 초마다 뉴욕의 온도를 방출하는 Observable
Observable<TempInfo> observable = getTemperature("New York")
// 단순 Observer로 Observable에 가입해서 온도 출력
observable.blockingSubscrube(new TempObserver());
}
Observable을 변환하고 합치기
자바 9 플로 API의 Flow.Processor는 한 스트림을 다른 스트림의 입력으로 사용할 수 있었다. 또한 필터링, 매핑 등의 동작이 가능하다. 하지만 스트림을 합치고, 만드는 등의 복잡한 작업을 구현하기는 매우 어려운 일이다. RxJava의 Observable 클래스는 앞서 언급한 복잡한 작업에 대하여 쉽게 처리할 수 있는 다양한 기능들을 제공한다. 아래 예시들을 살펴보자!
Map
// Observer에서 map을 이용해 화씨를 섭씨로 변환
public static Observable<TempInfo> getCelsiusTemperature(String town) {
return getTemperature(town)
.map(temp -> new TempInfo(temp.getTown(), (temp.getTemp() - 32) * 5 / 9));
}
위 예제는 getTemperature 메서드가 반환하는 Observable을 받아, 화씨를 섭씨로 변환한 다음 매 초 한 개씩 온도를 다시 방출하는 또 다른 Observable (TempeInfo)을 반환한다.
Merge
// 한 개 이상의 도시 온도에 대한 보고를 합침
public static Observable<TempInfo> getCelsiusTemperatures(String towns) {
return Observable.merge(Arrays.stream(towns)
.map(TempObservable::getCelsiusTemperature)
.collect(toList()));
}
위 메서드에서는 가변 인수를 문자열 스트림으로 변환한 다음, 각 문자열을 getCelsiusTemperature 메소드로 전달, 각 초마다 도시 온도를 방출하는 Observable로 변환시킨다. 그리고 Observable 스트림은 리스트로 모아지며, Observable 클래스가 제공하는 merge로 전달된다. (서로 다른 Observable이 마치 한 개의 Observable 처럼 동작)
[참고] 모던 자바 인 액션
하 근데... 정리를 했지만 서도 사실 너무 어려웠다!!
하지만 사실 어떨 때 쓰는지 어렴풋이 이해만 되었지만, 일단 비동기로 처리가 된 다는 것이 정말 큰 장점으로 다가온다.
'𝑷𝒓𝒐𝒈𝒓𝒂𝒎𝒎𝒊𝒏𝒈 > 𝐽𝐴𝑉𝐴' 카테고리의 다른 글
[JAVA] 레코드(record) 클래스 - jdk 14 (0) | 2023.01.20 |
---|---|
[JAVA / 모던 자바 인 액션] JAVA 8 부터의 컬렉션 API 개선 (0) | 2023.01.12 |
[JAVA] Collection - Map (HashMap, Hashing) (0) | 2023.01.05 |
[JAVA / 모던 자바 인 액션] 람다 표현식 (0) | 2022.12.31 |
[JAVA] Optional 개념 및 사용법 (0) | 2022.12.16 |