(Kotlin Coroutines) 010. Composing multiple and Flattening flows

Kotlin Coroutines - Composing multiple and Flattening flows

여러 개의 플로우가 존재할 때, 이를 어떻게 결합해서 사용할 수 있을까?

본 포스팅에서는 다중 플로우의 합성 방법에 대해서 알아보도록 하자.

1. zip

zip은 두 개의 플로우를 병합하여 새로운 데이터를 만들어내는 연산자이다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> { 
    val nums = (1..3).asFlow()
    val strs = flowOf("일", "이", "삼") 
    nums.zip(strs) { a, b -> "${a}은(는) $b" }
        .collect { println(it) }
}

출력 결과

1
2
3
1은(는) 일
2은(는) 이
3은(는) 삼

첫번째 플로우는 1, 2, 3을 스트림하는 플로우,

두번째 플로우는 “일”, “이”, “삼”을 스트림하는 플로우이다.

zip을 이용하면 두 개의 플로우에서 번갈아가면서 데이터를 수집하여 새로운 데이터로 반환한다.

2. combine

zip 연산자를 사용할 때 두 개의 플로우 중 하나가 더 큰 지연시간을 가지고 있다고 가정해보자.

이때 zip은 가장 큰 지연시간을 가진 플로우에 맞추어 수행된다.

combine을 사용하면 두 개의 플로우를 동일한 시점에 소비하지않고, 한 쪽이 갱신되면 새롭게 묶어 데이터를 만들어낸다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> { 
    val nums = (1..3).asFlow().onEach { delay(100L) }
    val strs = flowOf("일", "이", "삼").onEach { delay(200L) }
    nums.combine(strs) { a, b -> "${a}은(는) $b" }
        .collect { println(it) }
}

출력 결과

1
2
3
4
5
1은(는) 일
2은(는) 일
3은(는) 일
3은(는) 이
3은(는) 삼

숫자를 발행하는 플로우가 지연시간이 더 적으므로, 3이 모든 문자와 묶이는 것을 확인할 수 있다.

zipcombine 둘 중 어느게 우세하다고 보긴 어려우므로, 상황에 맞게 적절히 사용할 필요가 있다.

3. flatMapConcat

코루틴 플로우란 녀석을 다시금 생각해보면, 지연시간이든 연산이 오래걸리든

결국 비동기로 스트리밍되어 소비자에게 수신되는 값들의 시퀀스라는 걸 추론할 수 있다.

조금 더 생각해보면 A 플로우에서 emit되는 값들이 B 플로우의 연산에 필요한 경우도 있을 수 있다.

두 개의 비동기적인 플로우가 있을 때 이를 평평한(flatten) 시퀀스처럼 흐르게 할 필요가 있다.

이제 다중 플로우에 대해서 플래트닝 작업을 수행해보자.

먼저 flatMapConcat에 대해서 예제를 살펴보자.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking<Unit> { 
    val startTime = System.currentTimeMillis() // remember the start time 
    (1..3).asFlow()
        .onEach { delay(100) } // a number every 100 ms 
        .flatMapConcat { requestFlow(it) }
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
}

출력 결과

1
2
3
4
5
6
1: First at 125 ms from start
1: Second at 626 ms from start
2: First at 727 ms from start
2: Second at 1227 ms from start
3: First at 1328 ms from start
3: Second at 1828 ms from start

flatMapConcat은 다음 플로우의 수집을 수행하기전에 현재 수집중인 플로우가 완료될때까지 대기한다.

최초로 주어지는 플로우는 1, 2, 3을 0.1초마다 emit 하지만, flatMapConcat로 인해, 0.5초씩 전부 지연되며

모든 값이 순차적으로 emit 되는 것을 확인할 수 있다.

4. flatMapMerge

동일한 예제에서 flatMapConcatflatMapMerge로 교체해보자.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking<Unit> { 
    val startTime = System.currentTimeMillis() // remember the start time 
    (1..3).asFlow()
        .onEach { delay(100) } // a number every 100 ms 
        .flatMapMerge { requestFlow(it) }
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
}

출력 결과

1
2
3
4
5
6
1: First at 254 ms from start
2: First at 326 ms from start
3: First at 426 ms from start
1: Second at 754 ms from start
2: Second at 826 ms from start
3: Second at 928 ms from start

출력 결과에서 알 수 있듯 flatMapMerge는 플로우의 완료를 기다리지 않으며, 가능한 빨리 값들이 emit 되도록 출력하게끔 한다.

flatMapMerge 연산자 외에도 flattenMerge로도 구현할 수 있으며, 모두 concurrency 인자를 통해 동시에 수집 가능한 플로우의 개수를 지정할 수 있다.

참고 동시 수집 가능한 플로우의 개수는 DEFAULT_CONCURRENCY 값에 따라 달라진다. 기본값은 16이며, JVM의 DEFAULT_CONCURRENCY_PROPERTY_NAME 값에 의해 변경될 수 있다.

1
2
@FlowPreview
public const val DEFAULT_CONCURRENCY_PROPERTY_NAME: String = "kotlinx.coroutines.flow.defaultConcurrency"

5. flatMapLatest

이번엔 flatMapMergeflatMapLatest로 교체해보자.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking<Unit> { 
    val startTime = System.currentTimeMillis() // remember the start time 
    (1..3).asFlow()
        .onEach { delay(100) } // a number every 100 ms 
        .flatMapLatest { requestFlow(it) }
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
}

출력 결과

1
2
3
4
1: First at 160 ms from start
2: First at 295 ms from start
3: First at 396 ms from start
3: Second at 897 ms from start

이름에서 추론할 수 있듯 collectLastes처럼 마지막 값만 끝까지 소비하게 된다.

새로운 플로우가 emit 될 때마자, 직전의 플로우를 종료시켜 버린다.