Kotlin Coroutines - Flow basics
이번 포스팅에서는 코루틴의 플로우에 대해서 알아보자.
플로우는 리액티브 프로그래밍의 데이터 스트림을 구현하기 위한 라이브러리이다.
이 데이터 스트림은 크게 3가지 항목으로 구성된다.
![Entities involved in streams of data: consumer, optional intermediaries, and producer.](https://developer.android.com/images/kotlin/flow/flow-entities.png)
1. 생산자(producer)
데이터 스트림에 추가되는 데이터를 생산한다.
코루틴 플로우를 이용해 비동기적으로 데이터가 생산될 수도 있다.
2. 중개자(intermediary)
스트림에 내보내는 각각의 값이나 스트림 자체를 수정한다.
중개자의 경우 Optional로, Flow에 꼭 필요한 것은 아니다.
3. 소비자(consumer)
스트림의 데이터를 사용하는 주체이다.
간략하게 개념을 잡았으면 이제 예제를 통해 코루틴 플로우를 익혀보도록 하자.
1. Hello, Flow!
상술했듯 플로우는 코루틴을 기반으로 비동기 스트림을 제어할 수 있다.
아래 간단한 예제를 보자.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun doFlow(): Flow<Int> = flow { repeat(10) { emit(Random.nextInt(0, 500)) delay(10L) } } fun main() = runBlocking { doFlow().collect { value -> println(value) } }
출력 결과
1 | 326 |
doFlow()
메서드는 Flow<Int>
타입을 반환하고 있다.
즉 플로우가 반환하는 타입은 Flow<T>
형태의 객체임을 알 수 있다.
여기서 Flow<Int>
의 Int
는 플로우가 정수 타입을 emit
하고 있음을 알 수 있다.
runBlocking
블록처럼 flow
도 블럭을 가진 Flow Builder 역할을 수행한다.
코드 내용을 살펴보면 0에서 500 사이의 난수를 0.01초씩 지연하면서 10번 반복하여 스트림으로 emit
하는 코드임을 알 수 있다.
여기서 emit
되는 값들을 얻어오기 위해 collect
를 사용해야한다.
플로우는 콜드 스트림이기 때문에 collect
와 같은 함수를 호출하여 요청 해주지어야 값을 하나씩 스트리밍하기 때문이다.
참고 콜드 스트림에 대치되는 핫 스트림도 있다. 핫 스트림은 요청 전에 이미 활성화되어 0개 이상의 소비자들에게 값을 스트리밍한다.
2. withTimeoutOrNull
코루틴을 일정시간 이후 취소처리를 해주는 withTimeoutOrNull
함수가 있었다.
플로우도 마찬가지로 withTimeout
함수나 withTimeoutOrNull
함수를 이용해 취소할 수 있다.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun doFlow(): Flow<Int> = flow { repeat(10) { emit(Random.nextInt(0, 500)) delay(100L) } } fun main() = runBlocking<Unit> { val result = withTimeoutOrNull(500L) { doFlow().collect { value -> println(value) } true } ?: false if (!result) { println("Canceled.") } }
출력 결과
1 | 416 |
3. flowOf
위에서 언급한 플로우 빌더로 flow
만 있는 것은 아니다.
다른 함수를 이용해 플로우를 만들 수 있는 flowOf
에 대해서 알아보자.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun main() = runBlocking<Unit> { flowOf(1, 2, 3, 4, 5).collect { value -> println(value) } }
출력 결과
1 | 1 |
flowOf
의 차이는 고정된 값들을 emit
하는 플로우를 만들어낼 수 있다.
만약 flow
빌더로 고정된 값들을 emit
하려면 아래와 같이 작업해야 것이다.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun main() = runBlocking<Unit> { // flowOf(1, 2, 3, 4, 5).collect { value -> // println(value) // } flow { emit(1) emit(2) emit(3) emit(4) emit(5) }.collect { value -> println(value) } }
출력 결과
1 | 1 |
4. asFlow
flow
, flowOf
이외에도 asFlow
를 통해 플로우를 만들어낼 수 있다.
asFlow
는 컬렉션이나 시퀀스를 전달하여, 해당 객체를 구성하는 원소값들을 스트리밍해준다.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun main() = runBlocking<Unit> { println("listOf().asFlow()") listOf(1, 2, 3, 4, 5).asFlow().collect { value -> println(value) } println("{Sequence}.asFlow()") (6..10).asFlow().collect { println(it) } }
출력 결과
1 | listOf().asFlow() |
5. map
플로우에서 스트리밍되는 값들을 중간연산자를 통해 변환할 수 있다.
예제를 통해 알아보도록 하자.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun doFlow(): Flow<Int> = flow { repeat(10) { emit(Random.nextInt(0, 500)) delay(10L) } } fun main() = runBlocking { doFlow().map { "$it ${it * 2}" }.collect { value -> println(value) } }
출력 결과
1 | 411 822 |
doFlow()
메서드에 붙은 map
연산자를 통해 기존 플로우를 새로운 플로우로 만들어낸 뒤,
새로운 플로우에서 collect
를 호출하여 반환되는 값들을 출력하였다.
6. filter
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun main() = runBlocking<Unit> { (1..20).asFlow().filter { (it % 2) == 0 }.collect { println(it) } }
출력 결과
1 | 2 |
코틀린 유저라면 어느정도 친숙할 filter
연산자를 통해 값을 가공할 수 있다.
출력 결과를 보면 1부터 20까지 순회하는 시퀀스에서 필터링 조건인 짝수에 해당하는 값들만 collect
블록에 전달되는 것을 볼 수 있다.
7. filterNot
위의 예제에서 짝수 대신 홀수만 출력하려면 어떻게 해야할까?
filter
연산자의 조건을 변경할 수도 있겠지만, 완전히 반대되는 조건이 필요한 경우 filterNot
을 사용하면 된다.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun main() = runBlocking<Unit> { (1..20).asFlow().filterNot { (it % 2) == 0 }.collect { println(it) } }
출력 결과
1 | 1 |
filter
예제와 달리 홀수만 출력되는 것을 확인할 수 있다.
8. transform
transform
연산자를 활용하면 좀 더 유연하게 스트림을 변경할 수 있다.
map
이나 filter
보다 조금 더 복잡한 변경이 필요한 경우 사용한다.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* suspend fun someCalc(i: Int): Int { delay(10L) return i * 2 } fun main() = runBlocking<Unit> { (1..10).asFlow().transform { emit(it) emit(someCalc(it)) }.collect { println(it) } }
출력 결과
1 | 1 |
1부터 10까지 순회하는 시퀀스를 통해 얻은 1~10까지의 값을 emit
하고 직후 2배의 값을 emit
하여 총 20개의 값이 출력된 것을 확인할 수 있다.
예제에서 두 번의 emit
을 사용한 것처럼 transform
을 사용하면 여러 개의 emit
을 추가하여 여러 변환을 스트리밍하게 해준다.
9. take
take
연산자를 쓰면 스트리밍된 데이터 중에서 최초 N개의 결과만 선택적으로 취합할 수가 있다.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* suspend fun someCalc(i: Int): Int { delay(10L) return i * 2 } fun main() = runBlocking<Unit> { (1..10).asFlow().transform { emit(it) emit(someCalc(it)) }.take(5) .collect { println(it) } }
출력 결과
1 | 1 |
take(5)
로 인해서 emit
되는 값들 중 최초 5개만 출력된 것을 확인할 수 있다.
10. takeWhile
take
를 통해 특정 개수의 값을 가져올 순 있지만, 활용도가 떨어질 것이다.
좀 더 활용도를 높이기 위해 takeWhile
연산자를 이용해 조건을 부여하여 값을 얻어올 수 있다.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* suspend fun someCalc(i: Int): Int { delay(10L) return i * 2 } fun main() = runBlocking<Unit> { (1..10).asFlow().transform { emit(it) emit(someCalc(it)) }.takeWhile { it >= 5 }.collect { println(it) } }
출력 결과
1 | 출력 결과 없음 |
아무것도 출력이 되지 않는데 이는 조건에 맞지않는 값이 나오면 즉시 flow가 중지되기 때문이다.
출력이 되도록 코드를 수정해보자.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* suspend fun someCalc(i: Int): Int { delay(10L) return i * 2 } fun main() = runBlocking<Unit> { (5..10).asFlow().transform { // HERE emit(it) emit(someCalc(it)) }.takeWhile { it >= 5 }.collect { println(it) } }
출력 결과
1 | 5 |
코드의 의도대로 잘 출력되었음을 확인할 수 있다.
11. drop
take
가 주어진 크기만큼 스트림되는 값을 취한다면, 그 반대로 주어진 크기만큼 스트림되는 값을 버리는 drop
연산자도 있다.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* suspend fun someCalc(i: Int): Int { delay(10L) return i * 2 } fun main() = runBlocking<Unit> { (1..10).asFlow().transform { emit(it) emit(someCalc(it)) }.drop(5) .collect { println(it) } }
출력 결과
1 | 6 |
12. dropWhile
take
에 대응되는 drop
이 있듯이, takeWhile
에 대응되는 dropWhile
도 존재한다.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* suspend fun someCalc(i: Int): Int { delay(10L) return i * 2 } fun main() = runBlocking<Unit> { (1..10).asFlow().transform { emit(it) emit(someCalc(it)) }.dropWhile { it < 5 }.collect { println(it) } }
출력 결과
1 | 6 |
스트리밍된 값증 1, 2, 2, 4, 3 까진 전부 드랍한 뒤, 6이 나오고 나서 조건문이 해제되어 이후 스트리밍된 값들이 전부 출력된다.
13. reduce
여태 예제에서 호출한 collect
함수는 값을 다 받아오고 플로우를 끝내는 역할을 하기에 종단 연산자라고 불린다.
이 종단 연산자에는 collect
만 있는 것이 아니라 reduce
fold
등의 연산자와 toList
toSet
과 같이 컬렉션으로 변환해주는 함수들도 존재한다.
먼저 reduce
에 대해서 예제를 살펴보자.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* suspend fun someCalc(i: Int): Int { delay(10L) return i * 2 } fun main() = runBlocking<Unit> { val value = (1..10) .asFlow() .reduce { a, b -> a + b } println(value) }
출력 결과
1 | 55 |
reduce
가 호출이 되면 첫 번째, 두 번째 스트림되는 값을 입력 받는다.
위의 코드에서는 a에는 1, b에는 2가 주어지고, 이를 결합해 3을 반환하게 되고,
이후 주어지는 3을 받아와 기존에 가지고 있던 3에 더하는 식으로 반복된다.
즉 1부터 10까지의 총합을 구하는 로직과 동일하게 되어 55를 반환하게 된다.
추상적으로 표현하자면, 누직적으로 값을 계산해나가는 연산자라고 볼 수 있다.
참고
map
과reduce
는 항상 세트로 돌아다니는 개념이다.map
이 하나씩 값을 가져와 가공한다면reduce
는 누진적으로 값을 계산해나가는 과정을 뜻한다.
14. fold
fold
연산자는 reduce
와 유사하지만, 초깃값이 주어진다는 차이점이 있다.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* suspend fun someCalc(i: Int): Int { delay(10L) return i * 2 } fun main() = runBlocking<Unit> { val value = (1..10) .asFlow() .fold(10) { a, b -> a + b } println(value) }
출력 결과
1 | 65 |
reduce
의 결과값이 55인만큼 초깃값 10을 더해서 65를 출력한다.
15. count
count
는 스트림되는 값 중 조건을 만족하는 값의 갯수를 출력해준다.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun main() = runBlocking<Unit> { val counter = (1..10) .asFlow() .count { (it % 2) == 0 } println(counter) }
출력 결과
1 | 5 |
1부터 10까지의 정수 중 짝수의 개수인 5를 출력한다.