Kotlin Coroutines - Channel Fan-out & Fan-in
여러 개의 코루틴이 같은 채널에서 값을 수신하거나, 하나의 코루틴이 하나의 채널에 값을 송신하는 구조도 가능하다.
코루틴에서는 전자를 Fan-out, 후자를 Fan-in 이라고 부른다.
1. Fan-out
먼저 Fan-out에 대해서 알아보자.
Fan-out 구조는 여러 개의 코루틴이 동일한 채널에서 값을 수신 받아 작업을 분배하여 처리할 수 있다.
import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun CoroutineScope.produceNumbers() = produce<Int> { var x = 1 while (true) { send(x++) delay(100L) } } fun CoroutineScope.processNumber(id: Int, channel: ReceiveChannel<Int>) = launch { channel.consumeEach { println("Processor #${id} received ${it}") } } fun main() = runBlocking<Unit> { val producer = produceNumbers() repeat (5) { processNumber(it, producer) } delay(1000L) producer.cancel() }
출력 결과
1 | Processor #0 received 1 |
processNumber()
함수를 호출해서 만들어진 코루틴은 repeat
안에서 호출되기때문에 5개가 생성된다.
이렇게 생성된 5개의 코루틴은 produceNumbers()
함수로 생성되는 정수들을 나누어서 수신하게 된다.
출력 결과를 통해 produceNumbers()
의 지연이 0.1초이고, 메인 함수의 runBlocking
이 1초의 지연시간을 가지므로 1부터 10까지 10개의 값이 send
되고 이를 5개의 코루틴이 나누어서 수신받는 것을 확인할 수 있다.
2. Fan-in
Fan-out과 반대로 Fan-in은 데이터를 생산하는 코루틴이 여러 개이고, 이를 하나의 채널로 스트리밍되는 구조이다.
import kotlinx.coroutines.* import kotlinx.coroutines.channels.* suspend fun produceNumbers(channel: SendChannel<Int>, from: Int, interval: Long) { var x = from while (true) { channel.send(x) x += 2 delay(interval) } } fun CoroutineScope.processNumber(channel: ReceiveChannel<Int>) = launch { channel.consumeEach { println("received ${it}") } } fun main() = runBlocking<Unit> { val channel = Channel<Int>() launch { produceNumbers(channel, 1, 100L) } launch { produceNumbers(channel, 2, 150L) } processNumber(channel) delay(1000L) coroutineContext.cancelChildren() }
출력 결과
1 | received 1 |
produceNumbers()
함수를 두 개의 launch
블록에서 각각 호출하고 있다.
주어지는 파라미터에 따라
1, 3, 5, 7, 9 … (per 100ms)
2, 4, 6, 8, 10 … (per 150ms)
와 같이 값을 생산하여 마찬가지로 파라미터로 주어진 채널로 send
한다.
즉 하나의 채널에 두 개의 코루틴이 값을 송신하고 있다.
이후 같은 채널을 processNumber()
함수가 consumeEach
로 값을 수신받아 출력한다.
3. 채널은 공평하다 (Channels are fair)
하나의 채널에 여러 개의 코루틴이 송수신 동작을 수행하는 경우
채널은 이를 선입선출로 처리한다.
아래 예제는 “ping”과 “pong”이라는 이름을 가진 두 개의 코루틴이 채널 table
에서 Ball
객체를 공유하며 번갈아가면서 출력하는 것을 보여준다.
import kotlinx.coroutines.* import kotlinx.coroutines.channels.* data class Ball(var hits: Int) suspend fun player(name: String, table: Channel<Ball>) { for (ball in table) { // receive the ball in a loop ball.hits++ println("$name $ball") delay(300) // wait a bit table.send(ball) // send the ball back } } fun main() = runBlocking { val table = Channel<Ball>() // a shared table launch { player("ping", table) } launch { player("pong", table) } table.send(Ball(0)) // serve the ball delay(1000) // delay 1 second coroutineContext.cancelChildren() // game over, cancel them }
출력 결과
1 | ping Ball(hits=1) |
4. select
여러 개의 코루틴이 데이터를 생산하는 경우, 가장 먼저 완료된 결과를 바로 처리해야하는 경우가 있다.
select
표현식은 가장 먼저 완료된 결과를 곧바로 이용할 수 있게 처리해주는 함수이다.
import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.selects.* fun CoroutineScope.ping() = produce<String> { while (true) { delay(100L) send("ping") } } fun CoroutineScope.pong() = produce<String> { while (true) { delay(150L) send("pong") } } fun main() = runBlocking<Unit> { val ping = ping() val pong = pong() repeat (5) { select<Unit> { ping.onReceive { println("$it") } pong.onReceive { println("$it") } } } coroutineContext.cancelChildren() }
출력 결과
1 | ping |
ping()
은 0.1초마다 “ping”을, pong()
은 0.15초마다 “pong”을 send
해준다.
타임라인을 그려보면 아래와 같다.
0.1초 | 0.15초 | 0.2초 | 0.3초 | 0.3초 |
---|---|---|---|---|
ping | pong | ping | ping | pong |
물론 0.3초에 동시에 send
되는 경우 출력 결과가 아래와 같이 바뀔 수도 있다.
0.1초 | 0.15초 | 0.2초 | 0.3초 | 0.3초 |
---|---|---|---|---|
ping | pong | ping | pong | ping |
결론적으로 채널에 먼저 send
된 녀석을 바로 처리하게 해주는 표현식이라고 볼 수 있다.
이와 같은 select
표현식은 아래와 같은 상황에서 적절하게 사용할 수 있다.
List of supported select methods
Receiver | Suspending function | Select clause |
---|---|---|
Job | join | onJoin |
Deferred | await | onAwait |
SendChannel | send | onSend |
ReceiveChannel | receive | onReceive |
ReceiveChannel | receiveCatching | onReceiveCatching |
none | delay | onTimeout |
참고 kotlinx-coroutines-core/kotlinx.coroutines.selects/select
채널 버퍼링
예제 91: 버퍼
이전에 만들었던 예제를 확장하여 버퍼를 지정해보자. Channel
생성자는 인자로 버퍼의 사이즈를 지정 받는다.
지정하지 않으면 버퍼를 생성하지 않는다.
fun main() = runBlocking<Unit> {
val channel = Channel<Int>(10)
launch {
for (x in 1..20) {
println(“${x} 전송중”)
channel.send(x)
}
channel.close()
}
for (x in channel) {
println("${x} 수신")
delay(100L)
}
println("완료")
}
채널에 인자로 10
을 지정했다. 10개까지는 수신자가 받지 않아도 계속 전송한다.
예제 92: 랑데뷰
버퍼 사이즈를 랑데뷰(Channel.RENDEZVOUS)로 지정해봅시다.
fun main() = runBlocking<Unit> {
val channel = Channel<Int>(Channel.RENDEZVOUS)
launch {
for (x in 1..20) {
println(“${x} 전송중”)
channel.send(x)
}
channel.close()
}
for (x in channel) {
println("${x} 수신")
delay(100L)
}
println("완료")
}
랑데뷰는 버퍼 사이즈를 0
으로 지정하는 것입니다. 생성자에 사이즈를 전달하지 않으면 랑데뷰가 디폴트입니다.
이외에도 사이즈 대신 사용할 수 있는 다른 설정 값이 있습니다.
UNLIMITED
- 무제한으로 설정CONFLATED
- 오래된 값이 지워짐.BUFFERED
- 64개의 버퍼. 오버플로우엔 suspend
예제 93: 버퍼 오버플로우
버퍼의 오버플로우 정책에 따라 다른 결과가 나올 수 있습니다.
fun main() = runBlocking<Unit> {
val channel = Channel<Int>(2, BufferOverflow.DROP_OLDEST)
launch {
for (x in 1..50) {
channel.send(x)
}
channel.close()
}
delay(500L)
for (x in channel) {
println("${x} 수신")
delay(100L)
}
println("완료")
}
SUSPEND
- 잠이 들었다 깨어납니다.DROP_OLDEST
- 예전 데이터를 지웁니다.DROP_LATEST
- 새 데이터를 지웁니다.