(Kotlin Coroutines) 008. Flow basics

Kotlin Coroutines - Flow basics

이번 포스팅에서는 코루틴의 플로우에 대해서 알아보자.

플로우는 리액티브 프로그래밍의 데이터 스트림을 구현하기 위한 라이브러리이다.

이 데이터 스트림은 크게 3가지 항목으로 구성된다.

Entities involved in streams of data: consumer, optional intermediaries, and producer.

출처 Android Developer - Kotlin flows on Android

1. 생산자(producer)

데이터 스트림에 추가되는 데이터를 생산한다.

코루틴 플로우를 이용해 비동기적으로 데이터가 생산될 수도 있다.

2. 중개자(intermediary)

스트림에 내보내는 각각의 값이나 스트림 자체를 수정한다.

중개자의 경우 Optional로, Flow에 꼭 필요한 것은 아니다.

3. 소비자(consumer)

스트림의 데이터를 사용하는 주체이다.

간략하게 개념을 잡았으면 이제 예제를 통해 코루틴 플로우를 익혀보도록 하자.

1. Hello, Flow!

상술했듯 플로우는 코루틴을 기반으로 비동기 스트림을 제어할 수 있다.

아래 간단한 예제를 보자.

import kotlin.random.Random
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun doFlow(): Flow<Int> = flow {
    repeat(10) {
        emit(Random.nextInt(0, 500))
        delay(10L)
    }
}

fun main() = runBlocking {
    doFlow().collect { value ->
        println(value)
    }
}

출력 결과

1
2
3
4
5
6
7
8
9
10
326
402
288
423
353
29
398
292
460
6

doFlow() 메서드는 Flow<Int> 타입을 반환하고 있다.

즉 플로우가 반환하는 타입은 Flow<T> 형태의 객체임을 알 수 있다.

여기서 Flow<Int>Int는 플로우가 정수 타입을 emit하고 있음을 알 수 있다.

runBlocking 블록처럼 flow도 블럭을 가진 Flow Builder 역할을 수행한다.

코드 내용을 살펴보면 0에서 500 사이의 난수를 0.01초씩 지연하면서 10번 반복하여 스트림으로 emit하는 코드임을 알 수 있다.

여기서 emit 되는 값들을 얻어오기 위해 collect를 사용해야한다.

플로우는 콜드 스트림이기 때문에 collect와 같은 함수를 호출하여 요청 해주지어야 값을 하나씩 스트리밍하기 때문이다.

참고 콜드 스트림에 대치되는 핫 스트림도 있다. 핫 스트림은 요청 전에 이미 활성화되어 0개 이상의 소비자들에게 값을 스트리밍한다.

2. withTimeoutOrNull

코루틴을 일정시간 이후 취소처리를 해주는 withTimeoutOrNull 함수가 있었다.

플로우도 마찬가지로 withTimeout 함수나 withTimeoutOrNull 함수를 이용해 취소할 수 있다.

import kotlin.random.Random
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun doFlow(): Flow<Int> = flow {
    repeat(10) {
        emit(Random.nextInt(0, 500))
        delay(100L)
    }
}

fun main() = runBlocking<Unit> {
    val result = withTimeoutOrNull(500L) {
        doFlow().collect { value ->
            println(value)
        }
        true
    } ?: false
    if (!result) {
        println("Canceled.")
    }
}

출력 결과

1
2
3
4
5
6
416
332
54
70
127
Canceled.

3. flowOf

위에서 언급한 플로우 빌더로 flow만 있는 것은 아니다.

다른 함수를 이용해 플로우를 만들 수 있는 flowOf에 대해서 알아보자.

import kotlin.random.Random
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
    flowOf(1, 2, 3, 4, 5).collect { value ->
        println(value)
    }
}

출력 결과

1
2
3
4
5
1
2
3
4
5

flowOf의 차이는 고정된 값들을 emit하는 플로우를 만들어낼 수 있다.

만약 flow 빌더로 고정된 값들을 emit하려면 아래와 같이 작업해야 것이다.

import kotlin.random.Random
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
    // flowOf(1, 2, 3, 4, 5).collect { value ->
    //     println(value)
    // }
    flow {
        emit(1)
        emit(2)
        emit(3)
        emit(4)
        emit(5)
    }.collect { value -> println(value) }
}

출력 결과

1
2
3
4
5
1
2
3
4
5

4. asFlow

flow, flowOf 이외에도 asFlow를 통해 플로우를 만들어낼 수 있다.

asFlow는 컬렉션이나 시퀀스를 전달하여, 해당 객체를 구성하는 원소값들을 스트리밍해준다.

import kotlin.random.Random
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
    println("listOf().asFlow()")
    listOf(1, 2, 3, 4, 5).asFlow().collect { value ->
        println(value)
    }
    println("{Sequence}.asFlow()")
    (6..10).asFlow().collect {
        println(it)
    }
}

출력 결과

1
2
3
4
5
6
7
8
9
10
11
12
listOf().asFlow()
1
2
3
4
5
{Sequence}.asFlow()
6
7
8
9
10

5. map

플로우에서 스트리밍되는 값들을 중간연산자를 통해 변환할 수 있다.

예제를 통해 알아보도록 하자.

import kotlin.random.Random
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun doFlow(): Flow<Int> = flow {
    repeat(10) {
        emit(Random.nextInt(0, 500))
        delay(10L)
    }
}

fun main() = runBlocking {
    doFlow().map {
        "$it ${it * 2}"
    }.collect { value ->
        println(value)
    }
}

출력 결과

1
2
3
4
5
6
7
8
9
10
411 822
323 646
215 430
97 194
477 954
450 900
204 408
23 46
289 578
262 524

doFlow() 메서드에 붙은 map 연산자를 통해 기존 플로우를 새로운 플로우로 만들어낸 뒤,

새로운 플로우에서 collect를 호출하여 반환되는 값들을 출력하였다.

6. filter

import kotlin.random.Random
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
    (1..20).asFlow().filter {
        (it % 2) == 0
    }.collect {
        println(it)
    }
}

출력 결과

1
2
3
4
5
6
7
8
9
10
2
4
6
8
10
12
14
16
18
20

코틀린 유저라면 어느정도 친숙할 filter 연산자를 통해 값을 가공할 수 있다.

출력 결과를 보면 1부터 20까지 순회하는 시퀀스에서 필터링 조건인 짝수에 해당하는 값들만 collect 블록에 전달되는 것을 볼 수 있다.

7. filterNot

위의 예제에서 짝수 대신 홀수만 출력하려면 어떻게 해야할까?

filter 연산자의 조건을 변경할 수도 있겠지만, 완전히 반대되는 조건이 필요한 경우 filterNot을 사용하면 된다.

import kotlin.random.Random
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
    (1..20).asFlow().filterNot {
        (it % 2) == 0
    }.collect {
        println(it)
    }
}

출력 결과

1
2
3
4
5
6
7
8
9
10
1
3
5
7
9
11
13
15
17
19

filter 예제와 달리 홀수만 출력되는 것을 확인할 수 있다.

8. transform

transform 연산자를 활용하면 좀 더 유연하게 스트림을 변경할 수 있다.

map이나 filter보다 조금 더 복잡한 변경이 필요한 경우 사용한다.

import kotlin.random.Random
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

suspend fun someCalc(i: Int): Int {
    delay(10L)
    return i * 2
}

fun main() = runBlocking<Unit> {
    (1..10).asFlow().transform {
        emit(it)
        emit(someCalc(it))
    }.collect {
        println(it)
    }
}

출력 결과

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1
2
2
4
3
6
4
8
5
10
6
12
7
14
8
16
9
18
10
20

1부터 10까지 순회하는 시퀀스를 통해 얻은 1~10까지의 값을 emit하고 직후 2배의 값을 emit하여 총 20개의 값이 출력된 것을 확인할 수 있다.

예제에서 두 번의 emit을 사용한 것처럼 transform을 사용하면 여러 개의 emit을 추가하여 여러 변환을 스트리밍하게 해준다.

9. take

take 연산자를 쓰면 스트리밍된 데이터 중에서 최초 N개의 결과만 선택적으로 취합할 수가 있다.

import kotlin.random.Random
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

suspend fun someCalc(i: Int): Int {
    delay(10L)
    return i * 2
}

fun main() = runBlocking<Unit> {
    (1..10).asFlow().transform {
        emit(it)
        emit(someCalc(it))
    }.take(5)
    .collect {
        println(it)
    }
}

출력 결과

1
2
3
4
5
1
2
2
4
3

take(5)로 인해서 emit되는 값들 중 최초 5개만 출력된 것을 확인할 수 있다.

10. takeWhile

take를 통해 특정 개수의 값을 가져올 순 있지만, 활용도가 떨어질 것이다.

좀 더 활용도를 높이기 위해 takeWhile 연산자를 이용해 조건을 부여하여 값을 얻어올 수 있다.

import kotlin.random.Random
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

suspend fun someCalc(i: Int): Int {
    delay(10L)
    return i * 2
}

fun main() = runBlocking<Unit> {
    (1..10).asFlow().transform {
        emit(it)
        emit(someCalc(it))
    }.takeWhile {
        it >= 5
    }.collect {
        println(it)
    }
}

출력 결과

1
출력 결과 없음

아무것도 출력이 되지 않는데 이는 조건에 맞지않는 값이 나오면 즉시 flow가 중지되기 때문이다.

출력이 되도록 코드를 수정해보자.

import kotlin.random.Random
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

suspend fun someCalc(i: Int): Int {
    delay(10L)
    return i * 2
}

fun main() = runBlocking<Unit> {
    (5..10).asFlow().transform { // HERE
        emit(it)
        emit(someCalc(it))
    }.takeWhile {
        it >= 5
    }.collect {
        println(it)
    }
}

출력 결과

1
2
3
4
5
6
7
8
9
10
11
12
5
10
6
12
7
14
8
16
9
18
10
20

코드의 의도대로 잘 출력되었음을 확인할 수 있다.

11. drop

take가 주어진 크기만큼 스트림되는 값을 취한다면, 그 반대로 주어진 크기만큼 스트림되는 값을 버리는 drop 연산자도 있다.

import kotlin.random.Random
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

suspend fun someCalc(i: Int): Int {
    delay(10L)
    return i * 2
}

fun main() = runBlocking<Unit> {
    (1..10).asFlow().transform {
        emit(it)
        emit(someCalc(it))
    }.drop(5)
    .collect {
        println(it)
    }
}

출력 결과

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
6
4
8
5
10
6
12
7
14
8
16
9
18
10
20

12. dropWhile

take에 대응되는 drop이 있듯이, takeWhile에 대응되는 dropWhile도 존재한다.

import kotlin.random.Random
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

suspend fun someCalc(i: Int): Int {
    delay(10L)
    return i * 2
}

fun main() = runBlocking<Unit> {
    (1..10).asFlow().transform { 
        emit(it)
        emit(someCalc(it))
    }.dropWhile {
        it < 5
    }.collect {
        println(it)
    }
}

출력 결과

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
6
4
8
5
10
6
12
7
14
8
16
9
18
10
20

스트리밍된 값증 1, 2, 2, 4, 3 까진 전부 드랍한 뒤, 6이 나오고 나서 조건문이 해제되어 이후 스트리밍된 값들이 전부 출력된다.

13. reduce

여태 예제에서 호출한 collect 함수는 값을 다 받아오고 플로우를 끝내는 역할을 하기에 종단 연산자라고 불린다.

이 종단 연산자에는 collect만 있는 것이 아니라 reduce fold 등의 연산자와 toList toSet과 같이 컬렉션으로 변환해주는 함수들도 존재한다.

먼저 reduce에 대해서 예제를 살펴보자.

import kotlin.random.Random
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

suspend fun someCalc(i: Int): Int {
    delay(10L)
    return i * 2
}

fun main() = runBlocking<Unit> {
    val value = (1..10)
        .asFlow()
        .reduce { a, b ->
            a + b
        }
    println(value)
}

출력 결과

1
55

reduce 가 호출이 되면 첫 번째, 두 번째 스트림되는 값을 입력 받는다.

위의 코드에서는 a에는 1, b에는 2가 주어지고, 이를 결합해 3을 반환하게 되고,

이후 주어지는 3을 받아와 기존에 가지고 있던 3에 더하는 식으로 반복된다.

즉 1부터 10까지의 총합을 구하는 로직과 동일하게 되어 55를 반환하게 된다.

추상적으로 표현하자면, 누직적으로 값을 계산해나가는 연산자라고 볼 수 있다.

참고 mapreduce는 항상 세트로 돌아다니는 개념이다. map이 하나씩 값을 가져와 가공한다면 reduce는 누진적으로 값을 계산해나가는 과정을 뜻한다.

14. fold

fold 연산자는 reduce와 유사하지만, 초깃값이 주어진다는 차이점이 있다.

import kotlin.random.Random
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

suspend fun someCalc(i: Int): Int {
    delay(10L)
    return i * 2
}

fun main() = runBlocking<Unit> {
    val value = (1..10)
        .asFlow()
        .fold(10) { a, b ->
            a + b
        }
    println(value)
}

출력 결과

1
65

reduce의 결과값이 55인만큼 초깃값 10을 더해서 65를 출력한다.

15. count

count는 스트림되는 값 중 조건을 만족하는 값의 갯수를 출력해준다.

import kotlin.random.Random
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
    val counter = (1..10)
        .asFlow()
        .count {
            (it % 2) == 0
        }
    println(counter)
}

출력 결과

1
5

1부터 10까지의 정수 중 짝수의 개수인 5를 출력한다.