네이버에서 Backpressure 뜻을 찾아보면 역압으로 알려줍니다. 배압과 의미는 같으며, 역압이란 용어는 거의 사용하지 않습니다.
배압(Backpressure)
배압이란 생산되는 데이터와 소비의 균형이 어긋나는 현상입니다. 만약 10,000개의 데이터를 0.1초마다 발행하고, 소비는 10초마다 한다면 소비와 관계없이 데이터는 스트림에 계속 쌓이게 됩니다. 즉, Observable이 데이터를 발행하는 속도를 Observer의 소비 속도가 따라가지 못하는 것입니다.
Flowable(사전적의미: 유동적임)
기존의 Observable이 배압 현상을 제어하지 못하는 반면, Flowable은 배압 현상을 스스로 제어할 수 있습니다.
다음의 두 코드를 살펴보도록 하겠습니다.
Observable
Observable.range(1, 10000)
.doOnNext(integer -> System.out.println("Emit Data : "+integer))
.observeOn(Schedulers.io())
.subscribe(integer -> {
System.out.println("Consume Data : "+integer);
Thread.sleep(100);
});
Thread.sleep(100*10000);
Result:
Emit Data : 1
Emit Data : 2
Emit Data : 3
Emit Data : 4
Emit Data : 5
Consume Data : 1
...
Emit Data : 9998
Emit Data : 9999
Emit Data : 10000
Consume Data : 2
Consume Data : 3
Consume Data : 4
...
Flowable
Flowable.range(1, 10000)
.doOnNext(integer -> System.out.println("Emit Data : "+integer))
.observeOn(Schedulers.io())
.subscribe(integer -> {
System.out.println("Consume Data : "+integer);
Thread.sleep(100);
});
Thread.sleep(100*10000);
[실행결과]
Emit Data : 1
Emit Data : 2
Emit Data : 3
Emit Data : 4
Emit Data : 5
Consume Data : 1
...
Emit Data : 126
Emit Data : 127
Emit Data : 128
Consume Data : 2
Consume Data : 3
Consume Data : 4
...
두 예제 모두 10,000개의 데이터를 발행하면서, 소비는 100ms의 delay를 추가하였습니다.
Observable을 사용한 경우에는 데이터 발행과 소비가 균형적으로 일어나지 않으며 데이터는 소비와 상관없이 스트림에 계속 쌓이게 됩니다.
반면 Flowable을 사용한 경우에는 데이터가 일정량 누적되면 데이터를 더 이상 발행하지 않는 것을 확인할 수 있습니다.
이와 같이, Flowable은 스트림에 끊임없이 쌓이는 데이터의 양을 제어할 수 있는 Observable의 또 다른 형태입니다.
When to use Observable? When to use Flowable?
그렇다면 언제 Observable을, 언제 Flowable을 사용해야할까? RxJava Wiki에 Observable과 Flowable을 선택하는 기준이 포스트 되어 있습니다.
Observable을 사용해야 하는 경우
- 1,000개 미만의 데이터 흐름이 발생하는 경우
- 낮은 데이터 소스만을 활용하여 OutOfMemoryException이 발생할 확률이 낮은 경우
- 마우스 이벤트나 터치 이벤트와 같은 GUI 프로그래밍을 하는 경우 (초당 1,000회 이하의 이벤트는 Observable의 sample()이나 debounce()로 핸들링 가능)
- 동기적인 프로그래밍이 필요하지만 플랫폼에서 Java Streams을 지원하지 않는 경우
Flowable을 사용해야 하는 경우
- 10,000개 이상의 데이터 흐름이 발생하는 경우
- 디스크에서 파일을 읽는 경우 (기본적으로 Blocking/Pull-based 방식)
- JDBC에서 데이터베이스를 읽는 경우 (기본적으로 Blocking/Pull-based 방식)
- 네트워크 IO 실행 시
- Blocking/Pull-based 방식을 사용하고 있는데 나중에 Non-Blocking 방식의 Reactive API/드라이버에서 데이터를 가져올 일이 있는 경우
배압 전략
Flowable에도 배압을 제어하지 못해 MissingBackpressureException이 발생할 수 있는 예외상황(1)이 존재합니다. 따라서 Flowable에 배압 전략을 명시함으로써 배압을 제어할 수 있습니다.
5가지의 배압 전략이 존재하며 각각의 내용은 다음과 같습니다.
(1) Flowable과 interval()을 같이 사용하는 경우 interval 연산자는 스케줄러와 관계없이 시간에 의존해 데이터를 발행하므로 에러가 발생됩니다.
이름 | enum | 내용 |
MISSING | BackpressureStrategy.MISSING | 배압 전략을 구현하지 않음 |
ERROR | BackpressureStrategy.ERROR | 소비 속도가 발행 속도를 따라가지 못하는 경우 MissingBackpressureException 발생 |
BUFFER | BackpressureStrategy.BUFFER | 데이터를 소비할 때까지 데이터를 버퍼에 넣어둠. 무한한 크기의 큐이지만 OOME이 발생할 수 있음. |
DROP | BackpressureStrategy.DROP | 소비 속도가 발행 속도를 따라가지 못하는 경우 발행된 데이터를 모두 버림 |
LATEST | BackpressureStrategy.LATEST | 구독자가 데이터를 받을 준비가 될 때까지 최신 데이터만 유지하고 나머지는 버림 |
예제
create() 연산자를 통해 Flowable을 생성하는 경우 배압 전략을 명시해주어야 합니다.
Flowable.create(emitter -> {
for (int i = 0; i < 10000; i++) emitter.onNext(i);
emitter.onComplete();
}, BackpressureStrategy.DROP)
.observeOn(Schedulers.io())
.subscribe();
배압 제어 연산자
RxJava의 연산자 중에는 생성된 Flowable에 배압 전략을 적용할 수 있는 3가지 연산자를 제공합니다.
onBackPressureBuffer()
: 배압 이슈가 발생했을 때 별도의 버퍼에 저장을 합니다.(기본적으로 128개의 버퍼가 있습니다.)
onBackPressureDrop()
: 배압 이슈가 발생했을 때 해당 데이터를 무시합니다.
onBackPressureLatest()
: 처리할 수 없어서 쌓이는 데이터를 무시하면서 최신 데이터만 유지합니다.
예제
Flowable.range(1, 1000)
.onBackpressureLatest()
.doOnNext(integer -> System.out.println("Emit Data : "+integer))
.observeOn(Schedulers.io())
.subscribe(integer -> {
System.out.println("Consume Data : "+integer);
Thread.sleep(100);
});
Thread.sleep(100*1000);
Result:
Emit Data : 1
Emit Data : 2
Emit Data : 3
Emit Data : 4
Consume Data : 1
Emit Data : 5
...
Emit Data : 128
Consume Data : 2
Consume Data : 3
...
Consume Data : 95
Consume Data : 96
Emit Data : 1000
Consume Data : 97
Consume Data : 98
...
Consume Data : 128
Consume Data : 1000
참고 글: Dealing with Backpressure with RxJava
https://www.baeldung.com/rxjava-backpressure
'Programming 개발은 구글로 > JAVA[Android]' 카테고리의 다른 글
[RxJava] 2. Reactive Programming 이란? (0) | 2022.05.26 |
---|---|
[RxJava] 5.subscribeOn과 observeOn의 차이점 (0) | 2022.05.25 |
[안드로이드] GPS 정보로 현재 위치 찾기 (0) | 2022.05.23 |
Android 개발자를 위한 Google I/O 핵심 내용 13가지 (0) | 2022.05.22 |
[안드로이드] App Icon 추가 및 수정하기 (0) | 2022.05.19 |
댓글