Kotlin Coroutines - Flow Conext & Flow Buffering
플로우의 수집은 항상 호출한 코루틴의 컨텍스트 안에서 수행된다.
아래 코드를 보자.
1 | withContext(context) { |
위의 코드에서 simple()
이라는 플로우는 구현 내용과는 별개로 작성자가 명시한 코루틴의 컨텍스트에서 실행된다.
이러한 특성을 컨텍스트 보존(context preservation) 이라고 한다.
예제를 통해 보다 상세히 파악해보자.
1. Flow runs in Coroutine Context
상술했듯, 플로우는 코루틴의 컨텍스트에서 호출된다.
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun log(msg: String) = println("[${Thread.currentThread().name}] $msg") fun simple(): Flow<Int> = flow { log("flow is start") for (i in 1..10) { emit(i) } } fun main() = runBlocking<Unit> { launch(Dispatchers.IO) { simple() .collect { value -> log("value : $value") } } }
출력 결과
1 | [DefaultDispatcher-worker-1 @coroutine#2] flow is start |
simple()
플로우가 Dispatchers.IO
에서 호출되어, 동일한 DefaultDispatcher
를 출력하고 있음을 확인할 수 있다.
2. If context switching is not possible
플로우의 내부에서 Dispatchers를 변경하는 경우엔 어떻게 될까?
Dispatchers.IO
로 실행한 코루틴 내부의 플로우에서 Dispatchers.Default
로 변환하도록 예제를 수정한다.
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun log(msg: String) = println("[${Thread.currentThread().name}] $msg") fun simple(): Flow<Int> = flow { withContext(Dispatchers.Default) { for (i in 1..10) { delay(100L) emit(i) } } } fun main() = runBlocking<Unit> { launch(Dispatchers.IO) { simple() .collect { value -> log("value : $value") } } }
출력 결과
1 | Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated: |
위의 예제 코드에서 오류가 발생했다.
Dispatcher를 변경했기 때문인데, Dispatcher를 제거하면 정상 동작한다.
우리가 원하는 Context 변경은 어떻게 할 수 있을까?
3. flowOn
Context 변경을 오류없이 진행하려면 flowOn
연산자를 이용해야 한다.
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun log(msg: String) = println("[${Thread.currentThread().name}] $msg") fun simple(): Flow<Int> = flow { for (i in 1..10) { delay(100L) log("emit(${i})") emit(i) } }.flowOn(Dispatchers.Default) fun main() = runBlocking<Unit> { simple().collect { value -> log("value : $value") } }
출력 결과
1 | [DefaultDispatcher-worker-1 @coroutine#2] emit(1) |
flowOn
연산자는 업스트림 대상인 블록을 어떤 컨텍스트에서 수행할지 결정해준다.
1 | { |
업/다운 스트림이 결정되는 건 위와 같이 flowOn
의 위치에 따라 결정된다.
출력 결과를 보면 DefaultDispatcher
와 main
으로 구분되어 호출되고 있음을 확인할 수 있다.
4. Without buffer
필자는 유투브나 OTT와 같은 스트리밍 서비스를 자주 사용한다.
만약 스트리밍 서버에서 나에게 보내주는 영상의 속도보다, 내가 재생해서 보는 영상의 소비가 더 크다면?
어느 정도 다운로드되어 재생할 영상이 준비될 때까지 재생이 멈출 것이다.
참고 물론 요즘엔 인터넷 속도도 빠르고, 약전계에서도 화질을 낮추는 식으로 동적 대응한다.
이때 동영상 파일을 채우는 메모리 영역을 버퍼라 하고, 채우는 동작을 버퍼링 이라고 한다.
코루틴의 플로우도 마찬가지이다.
데이터를 생산하는 쪽과 소비하는 쪽의 균형이 맞지않으면 버퍼를 사용해야 한다.
만약 버퍼가 없다면 어떻게 될까?
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlin.system.* fun simple(): Flow<Int> = flow { for (i in 1..3) { delay(100) emit(i) } } fun main() = runBlocking<Unit> { val time = measureTimeMillis { simple().collect { value -> delay(300) println(value) } } println("Collected in $time ms") }
출력 결과
1 | 1 |
코드를 보면 소비하는 쪽에서 0.3초, 생산하는 쪽에서 0.1초의 지연을 사용하고 있다.
3개의 데이터를 생산하기 위해 1.2초를 사용해야하는 것이다.
5. buffer
플로우에 buffer
를 사용하면 소비자의 지연과 상관없이 계속해서 데이터를 받을 수 있다.
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlin.system.* fun simple(): Flow<Int> = flow { for (i in 1..3) { delay(100) emit(i) } } fun main() = runBlocking<Unit> { val time = measureTimeMillis { simple() .buffer() // HERE .collect { value -> delay(300) println(value) } } println("Collected in $time ms") }
출력 결과
1 | 1 |
단순히 buffer
를 붙인 것만으로 총 수행 시간이 감소했다.
이는 collect
의 지연 시간인 0.3초와는 상관없이 플로우가 계속 데이터를 스트리밍했기 때문이다.
즉 소비자의 지연시간만이 유의미해진다.
6. conflate
이번엔 conflate
를 사용해보자.
conflate
을 사용하면 소비보다 빠르게 발생한 데이터의 중간값들을 전부 누락시킨다.
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlin.system.* fun simple(): Flow<Int> = flow { for (i in 1..3) { delay(100) emit(i) } } fun main() = runBlocking<Unit> { val time = measureTimeMillis { simple().conflate() .collect { value -> delay(300) println(value) } } println("Collected in $time ms") }
출력 결과
1 | 1 |
정리해보면 collect
에서 conflate
를 사용하면 한 번 소비하기 시작한 값은 끝까지 소비하며, 생산되는 값의 타이밍과는 관계없이, 소비 완료 시점에서 가장 최신 값을 다시 소비하는 것으로 이해하면 된다.
7. collectLatest
conflate
을 이용해 최신 데이터만 사용하는 것이 가능하지만, 소비자 측의 속도가 너무 느린 경우
collectLatest
를 사용할 수 있다.
collectLatest
를 사용하면 새로운 값이 emit
될 때마다 소비자 블록을 종료시키고 다시 재시작하게 해준다.
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlin.system.* fun simple(): Flow<Int> = flow { for (i in 1..3) { delay(100) emit(i) } } fun main() = runBlocking<Unit> { val time = measureTimeMillis { simple() .collectLatest { value -> delay(300) println(value) } } println("Collected in $time ms") }
출력 결과
1 | 3 |
1과 2가 스트림되었을 때, 이미 0.3초만큼 지연중이기때문에 소비자 블록을 종료해버리고,
제일 마지막에 스트림된 값인 3만 소비하게 된다.