Kotlin Coroutines - Flow basics
이번 포스팅에서는 코루틴의 플로우에 대해서 알아보자.
플로우는 리액티브 프로그래밍의 데이터 스트림을 구현하기 위한 라이브러리이다.
이 데이터 스트림은 크게 3가지 항목으로 구성된다.
1. 생산자(producer)
데이터 스트림에 추가되는 데이터를 생산한다.
코루틴 플로우를 이용해 비동기적으로 데이터가 생산될 수도 있다.
2. 중개자(intermediary)
스트림에 내보내는 각각의 값이나 스트림 자체를 수정한다.
중개자의 경우 Optional로, Flow에 꼭 필요한 것은 아니다.
3. 소비자(consumer)
스트림의 데이터를 사용하는 주체이다.
간략하게 개념을 잡았으면 이제 예제를 통해 코루틴 플로우를 익혀보도록 하자.
1. Hello, Flow!
상술했듯 플로우는 코루틴을 기반으로 비동기 스트림을 제어할 수 있다.
아래 간단한 예제를 보자.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun doFlow(): Flow<Int> = flow { repeat(10) { emit(Random.nextInt(0, 500)) delay(10L) } } fun main() = runBlocking { doFlow().collect { value -> println(value) } }
출력 결과
1 | 326 |
doFlow()
메서드는 Flow<Int>
타입을 반환하고 있다.
즉 플로우가 반환하는 타입은 Flow<T>
형태의 객체임을 알 수 있다.
여기서 Flow<Int>
의 Int
는 플로우가 정수 타입을 emit
하고 있음을 알 수 있다.
runBlocking
블록처럼 flow
도 블럭을 가진 Flow Builder 역할을 수행한다.
코드 내용을 살펴보면 0에서 500 사이의 난수를 0.01초씩 지연하면서 10번 반복하여 스트림으로 emit
하는 코드임을 알 수 있다.
여기서 emit
되는 값들을 얻어오기 위해 collect
를 사용해야한다.
플로우는 콜드 스트림이기 때문에 collect
와 같은 함수를 호출하여 요청 해주지어야 값을 하나씩 스트리밍하기 때문이다.
참고 콜드 스트림에 대치되는 핫 스트림도 있다. 핫 스트림은 요청 전에 이미 활성화되어 0개 이상의 소비자들에게 값을 스트리밍한다.
2. withTimeoutOrNull
코루틴을 일정시간 이후 취소처리를 해주는 withTimeoutOrNull
함수가 있었다.
플로우도 마찬가지로 withTimeout
함수나 withTimeoutOrNull
함수를 이용해 취소할 수 있다.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun doFlow(): Flow<Int> = flow { repeat(10) { emit(Random.nextInt(0, 500)) delay(100L) } } fun main() = runBlocking<Unit> { val result = withTimeoutOrNull(500L) { doFlow().collect { value -> println(value) } true } ?: false if (!result) { println("Canceled.") } }
출력 결과
1 | 416 |
3. flowOf
위에서 언급한 플로우 빌더로 flow
만 있는 것은 아니다.
다른 함수를 이용해 플로우를 만들 수 있는 flowOf
에 대해서 알아보자.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun main() = runBlocking<Unit> { flowOf(1, 2, 3, 4, 5).collect { value -> println(value) } }
출력 결과
1 | 1 |
flowOf
의 차이는 고정된 값들을 emit
하는 플로우를 만들어낼 수 있다.
만약 flow
빌더로 고정된 값들을 emit
하려면 아래와 같이 작업해야 것이다.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun main() = runBlocking<Unit> { // flowOf(1, 2, 3, 4, 5).collect { value -> // println(value) // } flow { emit(1) emit(2) emit(3) emit(4) emit(5) }.collect { value -> println(value) } }
출력 결과
1 | 1 |
4. asFlow
flow
, flowOf
이외에도 asFlow
를 통해 플로우를 만들어낼 수 있다.
asFlow
는 컬렉션이나 시퀀스를 전달하여, 해당 객체를 구성하는 원소값들을 스트리밍해준다.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun main() = runBlocking<Unit> { println("listOf().asFlow()") listOf(1, 2, 3, 4, 5).asFlow().collect { value -> println(value) } println("{Sequence}.asFlow()") (6..10).asFlow().collect { println(it) } }
출력 결과
1 | listOf().asFlow() |
5. map
플로우에서 스트리밍되는 값들을 중간연산자를 통해 변환할 수 있다.
예제를 통해 알아보도록 하자.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun doFlow(): Flow<Int> = flow { repeat(10) { emit(Random.nextInt(0, 500)) delay(10L) } } fun main() = runBlocking { doFlow().map { "$it ${it * 2}" }.collect { value -> println(value) } }
출력 결과
1 | 411 822 |
doFlow()
메서드에 붙은 map
연산자를 통해 기존 플로우를 새로운 플로우로 만들어낸 뒤,
새로운 플로우에서 collect
를 호출하여 반환되는 값들을 출력하였다.
6. filter
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun main() = runBlocking<Unit> { (1..20).asFlow().filter { (it % 2) == 0 }.collect { println(it) } }
출력 결과
1 | 2 |
코틀린 유저라면 어느정도 친숙할 filter
연산자를 통해 값을 가공할 수 있다.
출력 결과를 보면 1부터 20까지 순회하는 시퀀스에서 필터링 조건인 짝수에 해당하는 값들만 collect
블록에 전달되는 것을 볼 수 있다.
7. filterNot
위의 예제에서 짝수 대신 홀수만 출력하려면 어떻게 해야할까?
filter
연산자의 조건을 변경할 수도 있겠지만, 완전히 반대되는 조건이 필요한 경우 filterNot
을 사용하면 된다.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun main() = runBlocking<Unit> { (1..20).asFlow().filterNot { (it % 2) == 0 }.collect { println(it) } }
출력 결과
1 | 1 |
filter
예제와 달리 홀수만 출력되는 것을 확인할 수 있다.
8. transform
transform
연산자를 활용하면 좀 더 유연하게 스트림을 변경할 수 있다.
map
이나 filter
보다 조금 더 복잡한 변경이 필요한 경우 사용한다.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* suspend fun someCalc(i: Int): Int { delay(10L) return i * 2 } fun main() = runBlocking<Unit> { (1..10).asFlow().transform { emit(it) emit(someCalc(it)) }.collect { println(it) } }
출력 결과
1 | 1 |
1부터 10까지 순회하는 시퀀스를 통해 얻은 1~10까지의 값을 emit
하고 직후 2배의 값을 emit
하여 총 20개의 값이 출력된 것을 확인할 수 있다.
예제에서 두 번의 emit
을 사용한 것처럼 transform
을 사용하면 여러 개의 emit
을 추가하여 여러 변환을 스트리밍하게 해준다.
9. take
take
연산자를 쓰면 스트리밍된 데이터 중에서 최초 N개의 결과만 선택적으로 취합할 수가 있다.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* suspend fun someCalc(i: Int): Int { delay(10L) return i * 2 } fun main() = runBlocking<Unit> { (1..10).asFlow().transform { emit(it) emit(someCalc(it)) }.take(5) .collect { println(it) } }
출력 결과
1 | 1 |
take(5)
로 인해서 emit
되는 값들 중 최초 5개만 출력된 것을 확인할 수 있다.
10. takeWhile
take
를 통해 특정 개수의 값을 가져올 순 있지만, 활용도가 떨어질 것이다.
좀 더 활용도를 높이기 위해 takeWhile
연산자를 이용해 조건을 부여하여 값을 얻어올 수 있다.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* suspend fun someCalc(i: Int): Int { delay(10L) return i * 2 } fun main() = runBlocking<Unit> { (1..10).asFlow().transform { emit(it) emit(someCalc(it)) }.takeWhile { it >= 5 }.collect { println(it) } }
출력 결과
1 | 출력 결과 없음 |
아무것도 출력이 되지 않는데 이는 조건에 맞지않는 값이 나오면 즉시 flow가 중지되기 때문이다.
출력이 되도록 코드를 수정해보자.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* suspend fun someCalc(i: Int): Int { delay(10L) return i * 2 } fun main() = runBlocking<Unit> { (5..10).asFlow().transform { // HERE emit(it) emit(someCalc(it)) }.takeWhile { it >= 5 }.collect { println(it) } }
출력 결과
1 | 5 |
코드의 의도대로 잘 출력되었음을 확인할 수 있다.
11. drop
take
가 주어진 크기만큼 스트림되는 값을 취한다면, 그 반대로 주어진 크기만큼 스트림되는 값을 버리는 drop
연산자도 있다.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* suspend fun someCalc(i: Int): Int { delay(10L) return i * 2 } fun main() = runBlocking<Unit> { (1..10).asFlow().transform { emit(it) emit(someCalc(it)) }.drop(5) .collect { println(it) } }
출력 결과
1 | 6 |
12. dropWhile
take
에 대응되는 drop
이 있듯이, takeWhile
에 대응되는 dropWhile
도 존재한다.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* suspend fun someCalc(i: Int): Int { delay(10L) return i * 2 } fun main() = runBlocking<Unit> { (1..10).asFlow().transform { emit(it) emit(someCalc(it)) }.dropWhile { it < 5 }.collect { println(it) } }
출력 결과
1 | 6 |
스트리밍된 값증 1, 2, 2, 4, 3 까진 전부 드랍한 뒤, 6이 나오고 나서 조건문이 해제되어 이후 스트리밍된 값들이 전부 출력된다.
13. reduce
여태 예제에서 호출한 collect
함수는 값을 다 받아오고 플로우를 끝내는 역할을 하기에 종단 연산자라고 불린다.
이 종단 연산자에는 collect
만 있는 것이 아니라 reduce
fold
등의 연산자와 toList
toSet
과 같이 컬렉션으로 변환해주는 함수들도 존재한다.
먼저 reduce
에 대해서 예제를 살펴보자.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* suspend fun someCalc(i: Int): Int { delay(10L) return i * 2 } fun main() = runBlocking<Unit> { val value = (1..10) .asFlow() .reduce { a, b -> a + b } println(value) }
출력 결과
1 | 55 |
reduce
가 호출이 되면 첫 번째, 두 번째 스트림되는 값을 입력 받는다.
위의 코드에서는 a에는 1, b에는 2가 주어지고, 이를 결합해 3을 반환하게 되고,
이후 주어지는 3을 받아와 기존에 가지고 있던 3에 더하는 식으로 반복된다.
즉 1부터 10까지의 총합을 구하는 로직과 동일하게 되어 55를 반환하게 된다.
추상적으로 표현하자면, 누직적으로 값을 계산해나가는 연산자라고 볼 수 있다.
참고
map
과reduce
는 항상 세트로 돌아다니는 개념이다.map
이 하나씩 값을 가져와 가공한다면reduce
는 누진적으로 값을 계산해나가는 과정을 뜻한다.
14. fold
fold
연산자는 reduce
와 유사하지만, 초깃값이 주어진다는 차이점이 있다.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* suspend fun someCalc(i: Int): Int { delay(10L) return i * 2 } fun main() = runBlocking<Unit> { val value = (1..10) .asFlow() .fold(10) { a, b -> a + b } println(value) }
출력 결과
1 | 65 |
reduce
의 결과값이 55인만큼 초깃값 10을 더해서 65를 출력한다.
15. count
count
는 스트림되는 값 중 조건을 만족하는 값의 갯수를 출력해준다.
import kotlin.random.Random import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun main() = runBlocking<Unit> { val counter = (1..10) .asFlow() .count { (it % 2) == 0 } println(counter) }
출력 결과
1 | 5 |
1부터 10까지의 정수 중 짝수의 개수인 5를 출력한다.