Kotlin Coroutines - Composing multiple and Flattening flows
여러 개의 플로우가 존재할 때, 이를 어떻게 결합해서 사용할 수 있을까?
본 포스팅에서는 다중 플로우의 합성 방법에 대해서 알아보도록 하자.
1. zip
zip
은 두 개의 플로우를 병합하여 새로운 데이터를 만들어내는 연산자이다.
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun main() = runBlocking<Unit> { val nums = (1..3).asFlow() val strs = flowOf("일", "이", "삼") nums.zip(strs) { a, b -> "${a}은(는) $b" } .collect { println(it) } }
출력 결과
1 | 1은(는) 일 |
첫번째 플로우는 1, 2, 3을 스트림하는 플로우,
두번째 플로우는 “일”, “이”, “삼”을 스트림하는 플로우이다.
zip
을 이용하면 두 개의 플로우에서 번갈아가면서 데이터를 수집하여 새로운 데이터로 반환한다.
2. combine
zip
연산자를 사용할 때 두 개의 플로우 중 하나가 더 큰 지연시간을 가지고 있다고 가정해보자.
이때 zip
은 가장 큰 지연시간을 가진 플로우에 맞추어 수행된다.
combine
을 사용하면 두 개의 플로우를 동일한 시점에 소비하지않고, 한 쪽이 갱신되면 새롭게 묶어 데이터를 만들어낸다.
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun main() = runBlocking<Unit> { val nums = (1..3).asFlow().onEach { delay(100L) } val strs = flowOf("일", "이", "삼").onEach { delay(200L) } nums.combine(strs) { a, b -> "${a}은(는) $b" } .collect { println(it) } }
출력 결과
1 | 1은(는) 일 |
숫자를 발행하는 플로우가 지연시간이 더 적으므로, 3이 모든 문자와 묶이는 것을 확인할 수 있다.
zip
과 combine
둘 중 어느게 우세하다고 보긴 어려우므로, 상황에 맞게 적절히 사용할 필요가 있다.
3. flatMapConcat
코루틴 플로우란 녀석을 다시금 생각해보면, 지연시간이든 연산이 오래걸리든
결국 비동기로 스트리밍되어 소비자에게 수신되는 값들의 시퀀스라는 걸 추론할 수 있다.
조금 더 생각해보면 A 플로우에서 emit
되는 값들이 B 플로우의 연산에 필요한 경우도 있을 수 있다.
두 개의 비동기적인 플로우가 있을 때 이를 평평한(flatten) 시퀀스처럼 흐르게 할 필요가 있다.
이제 다중 플로우에 대해서 플래트닝 작업을 수행해보자.
먼저 flatMapConcat
에 대해서 예제를 살펴보자.
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun requestFlow(i: Int): Flow<String> = flow { emit("$i: First") delay(500) // wait 500 ms emit("$i: Second") } fun main() = runBlocking<Unit> { val startTime = System.currentTimeMillis() // remember the start time (1..3).asFlow() .onEach { delay(100) } // a number every 100 ms .flatMapConcat { requestFlow(it) } .collect { value -> // collect and print println("$value at ${System.currentTimeMillis() - startTime} ms from start") } }
출력 결과
1 | 1: First at 125 ms from start |
flatMapConcat
은 다음 플로우의 수집을 수행하기전에 현재 수집중인 플로우가 완료될때까지 대기한다.
최초로 주어지는 플로우는 1, 2, 3을 0.1초마다 emit
하지만, flatMapConcat
로 인해, 0.5초씩 전부 지연되며
모든 값이 순차적으로 emit
되는 것을 확인할 수 있다.
4. flatMapMerge
동일한 예제에서 flatMapConcat
을 flatMapMerge
로 교체해보자.
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun requestFlow(i: Int): Flow<String> = flow { emit("$i: First") delay(500) // wait 500 ms emit("$i: Second") } fun main() = runBlocking<Unit> { val startTime = System.currentTimeMillis() // remember the start time (1..3).asFlow() .onEach { delay(100) } // a number every 100 ms .flatMapMerge { requestFlow(it) } .collect { value -> // collect and print println("$value at ${System.currentTimeMillis() - startTime} ms from start") } }
출력 결과
1 | 1: First at 254 ms from start |
출력 결과에서 알 수 있듯 flatMapMerge
는 플로우의 완료를 기다리지 않으며, 가능한 빨리 값들이 emit
되도록 출력하게끔 한다.
flatMapMerge
연산자 외에도 flattenMerge
로도 구현할 수 있으며, 모두 concurrency
인자를 통해 동시에 수집 가능한 플로우의 개수를 지정할 수 있다.
참고 동시 수집 가능한 플로우의 개수는
DEFAULT_CONCURRENCY
값에 따라 달라진다. 기본값은 16이며, JVM의DEFAULT_CONCURRENCY_PROPERTY_NAME
값에 의해 변경될 수 있다.
1 |
|
5. flatMapLatest
이번엔 flatMapMerge
을 flatMapLatest
로 교체해보자.
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun requestFlow(i: Int): Flow<String> = flow { emit("$i: First") delay(500) // wait 500 ms emit("$i: Second") } fun main() = runBlocking<Unit> { val startTime = System.currentTimeMillis() // remember the start time (1..3).asFlow() .onEach { delay(100) } // a number every 100 ms .flatMapLatest { requestFlow(it) } .collect { value -> // collect and print println("$value at ${System.currentTimeMillis() - startTime} ms from start") } }
출력 결과
1 | 1: First at 160 ms from start |
이름에서 추론할 수 있듯 collectLastes
처럼 마지막 값만 끝까지 소비하게 된다.
새로운 플로우가 emit
될 때마자, 직전의 플로우를 종료시켜 버린다.