(Kotlin Coroutines) 009. Flow Context & Flow Buffering

Kotlin Coroutines - Flow Conext & Flow Buffering

플로우의 수집은 항상 호출한 코루틴의 컨텍스트 안에서 수행된다.

아래 코드를 보자.

1
2
3
4
5
withContext(context) {
simple().collect { value ->
println(value) // run in the specified context
}
}

위의 코드에서 simple()이라는 플로우는 구현 내용과는 별개로 작성자가 명시한 코루틴의 컨텍스트에서 실행된다.

이러한 특성을 컨텍스트 보존(context preservation) 이라고 한다.

출처 Kotlinlang.org - Asynchronous Flow

예제를 통해 보다 상세히 파악해보자.

1. Flow runs in Coroutine Context

상술했듯, 플로우는 코루틴의 컨텍스트에서 호출된다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
           
fun simple(): Flow<Int> = flow {
    log("flow is start")
    for (i in 1..10) {
        emit(i)
    }
}  

fun main() = runBlocking<Unit> {
    launch(Dispatchers.IO) {
        simple()
            .collect { value -> log("value : $value") } 
    }
}   

출력 결과

1
2
3
4
5
6
7
8
9
10
11
[DefaultDispatcher-worker-1 @coroutine#2] flow is start
[DefaultDispatcher-worker-1 @coroutine#2] value : 1
[DefaultDispatcher-worker-1 @coroutine#2] value : 2
[DefaultDispatcher-worker-1 @coroutine#2] value : 3
[DefaultDispatcher-worker-1 @coroutine#2] value : 4
[DefaultDispatcher-worker-1 @coroutine#2] value : 5
[DefaultDispatcher-worker-1 @coroutine#2] value : 6
[DefaultDispatcher-worker-1 @coroutine#2] value : 7
[DefaultDispatcher-worker-1 @coroutine#2] value : 8
[DefaultDispatcher-worker-1 @coroutine#2] value : 9
[DefaultDispatcher-worker-1 @coroutine#2] value : 10

simple() 플로우가 Dispatchers.IO 에서 호출되어, 동일한 DefaultDispatcher를 출력하고 있음을 확인할 수 있다.

2. If context switching is not possible

플로우의 내부에서 Dispatchers를 변경하는 경우엔 어떻게 될까?

Dispatchers.IO로 실행한 코루틴 내부의 플로우에서 Dispatchers.Default로 변환하도록 예제를 수정한다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
           
fun simple(): Flow<Int> = flow {
    withContext(Dispatchers.Default) {
        for (i in 1..10) {
            delay(100L)
            emit(i)
        }
    }
}  

fun main() = runBlocking<Unit> {
    launch(Dispatchers.IO) {
        simple()
            .collect { value -> log("value : $value") } 
    }
}   

출력 결과

1
2
3
4
5
6
7
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [CoroutineId(2), "coroutine#2":StandaloneCoroutine{Active}@6b9e5ee0, Dispatchers.IO],
but emission happened in [CoroutineId(2), "coroutine#2":DispatchedCoroutine{Active}@5f867b9c, Dispatchers.Default].
Please refer to 'flow' documentation or use 'flowOn' instead
at kotlinx.coroutines.flow.internal.SafeCollector_commonKt.checkContext (SafeCollector.common.kt:85)
at kotlinx.coroutines.flow.internal.SafeCollector.checkContext (SafeCollector.kt:88)
at kotlinx.coroutines.flow.internal.SafeCollector.emit (SafeCollector.kt:74)

위의 예제 코드에서 오류가 발생했다.

Dispatcher를 변경했기 때문인데, Dispatcher를 제거하면 정상 동작한다.

우리가 원하는 Context 변경은 어떻게 할 수 있을까?

3. flowOn

Context 변경을 오류없이 진행하려면 flowOn 연산자를 이용해야 한다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
           
fun simple(): Flow<Int> = flow {
    for (i in 1..10) {
        delay(100L)
        log("emit(${i})")
        emit(i)
    }
}.flowOn(Dispatchers.Default)

fun main() = runBlocking<Unit> {
    simple().collect { value -> 
        log("value : $value")
    } 
}   

출력 결과

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
[DefaultDispatcher-worker-1 @coroutine#2] emit(1)
[main @coroutine#1] value : 1
[DefaultDispatcher-worker-1 @coroutine#2] emit(2)
[main @coroutine#1] value : 2
[DefaultDispatcher-worker-1 @coroutine#2] emit(3)
[main @coroutine#1] value : 3
[DefaultDispatcher-worker-1 @coroutine#2] emit(4)
[main @coroutine#1] value : 4
[DefaultDispatcher-worker-1 @coroutine#2] emit(5)
[main @coroutine#1] value : 5
[DefaultDispatcher-worker-1 @coroutine#2] emit(6)
[main @coroutine#1] value : 6
[DefaultDispatcher-worker-1 @coroutine#2] emit(7)
[main @coroutine#1] value : 7
[DefaultDispatcher-worker-1 @coroutine#2] emit(8)
[main @coroutine#1] value : 8
[DefaultDispatcher-worker-1 @coroutine#2] emit(9)
[main @coroutine#1] value : 9
[DefaultDispatcher-worker-1 @coroutine#2] emit(10)
[main @coroutine#1] value : 10

flowOn 연산자는 업스트림 대상인 블록을 어떤 컨텍스트에서 수행할지 결정해준다.

1
2
3
4
5
6
{ 
// Upstream Block
}.flowOn(Dispatcher.Default)
.map {
// Downstream Block
}

업/다운 스트림이 결정되는 건 위와 같이 flowOn의 위치에 따라 결정된다.

출력 결과를 보면 DefaultDispatchermain 으로 구분되어 호출되고 있음을 확인할 수 있다.

4. Without buffer

필자는 유투브나 OTT와 같은 스트리밍 서비스를 자주 사용한다.

만약 스트리밍 서버에서 나에게 보내주는 영상의 속도보다, 내가 재생해서 보는 영상의 소비가 더 크다면?

어느 정도 다운로드되어 재생할 영상이 준비될 때까지 재생이 멈출 것이다.

참고 물론 요즘엔 인터넷 속도도 빠르고, 약전계에서도 화질을 낮추는 식으로 동적 대응한다.

이때 동영상 파일을 채우는 메모리 영역을 버퍼라 하고, 채우는 동작을 버퍼링 이라고 한다.

코루틴의 플로우도 마찬가지이다.

데이터를 생산하는 쪽과 소비하는 쪽의 균형이 맞지않으면 버퍼를 사용해야 한다.

만약 버퍼가 없다면 어떻게 될까?

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple().collect { value -> 
            delay(300)
            println(value) 
        } 
    }   
    println("Collected in $time ms")
}

출력 결과

1
2
3
4
1
2
3
Collected in 1220 ms

코드를 보면 소비하는 쪽에서 0.3초, 생산하는 쪽에서 0.1초의 지연을 사용하고 있다.

3개의 데이터를 생산하기 위해 1.2초를 사용해야하는 것이다.

5. buffer

플로우에 buffer를 사용하면 소비자의 지연과 상관없이 계속해서 데이터를 받을 수 있다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple()
        .buffer() // HERE
        .collect { value -> 
            delay(300)
            println(value) 
        } 
    }   
    println("Collected in $time ms")
}

출력 결과

1
2
3
4
1
2
3
Collected in 1049 ms

단순히 buffer를 붙인 것만으로 총 수행 시간이 감소했다.

이는 collect의 지연 시간인 0.3초와는 상관없이 플로우가 계속 데이터를 스트리밍했기 때문이다.

즉 소비자의 지연시간만이 유의미해진다.

6. conflate

이번엔 conflate를 사용해보자.

conflate을 사용하면 소비보다 빠르게 발생한 데이터의 중간값들을 전부 누락시킨다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple().conflate()
            .collect { value -> 
                delay(300)
                println(value) 
            } 
    }   
    println("Collected in $time ms")
}

출력 결과

1
2
3
1
3
Collected in 752 ms

정리해보면 collect에서 conflate를 사용하면 한 번 소비하기 시작한 값은 끝까지 소비하며, 생산되는 값의 타이밍과는 관계없이, 소비 완료 시점에서 가장 최신 값을 다시 소비하는 것으로 이해하면 된다.

7. collectLatest

conflate 을 이용해 최신 데이터만 사용하는 것이 가능하지만, 소비자 측의 속도가 너무 느린 경우

collectLatest를 사용할 수 있다.

collectLatest를 사용하면 새로운 값이 emit 될 때마다 소비자 블록을 종료시키고 다시 재시작하게 해준다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple()
            .collectLatest { value -> 
                delay(300)
                println(value) 
            } 
    }   
    println("Collected in $time ms")
}

출력 결과

1
2
3
Collected in 709 ms

1과 2가 스트림되었을 때, 이미 0.3초만큼 지연중이기때문에 소비자 블록을 종료해버리고,

제일 마지막에 스트림된 값인 3만 소비하게 된다.