(Kotlin Coroutines) 016. Channel Fan-out & Fan-in

Kotlin Coroutines - Channel Fan-out & Fan-in

여러 개의 코루틴이 같은 채널에서 값을 수신하거나, 하나의 코루틴이 하나의 채널에 값을 송신하는 구조도 가능하다.

코루틴에서는 전자를 Fan-out, 후자를 Fan-in 이라고 부른다.

1. Fan-out

먼저 Fan-out에 대해서 알아보자.

Fan-out 구조는 여러 개의 코루틴이 동일한 채널에서 값을 수신 받아 작업을 분배하여 처리할 수 있다.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) {
        send(x++)
        delay(100L)
    }
}

fun CoroutineScope.processNumber(id: Int, channel: ReceiveChannel<Int>) = launch {
    channel.consumeEach {
        println("Processor #${id} received ${it}")
    }
}


fun main() = runBlocking<Unit> {
    val producer = produceNumbers()
    repeat (5) {
        processNumber(it, producer)
    }
    delay(1000L)
    producer.cancel()
}

출력 결과

1
2
3
4
5
6
7
8
9
10
Processor #0 received 1
Processor #0 received 2
Processor #1 received 3
Processor #2 received 4
Processor #3 received 5
Processor #4 received 6
Processor #0 received 7
Processor #1 received 8
Processor #2 received 9
Processor #3 received 10

processNumber() 함수를 호출해서 만들어진 코루틴은 repeat 안에서 호출되기때문에 5개가 생성된다.

이렇게 생성된 5개의 코루틴은 produceNumbers() 함수로 생성되는 정수들을 나누어서 수신하게 된다.

출력 결과를 통해 produceNumbers()의 지연이 0.1초이고, 메인 함수의 runBlocking이 1초의 지연시간을 가지므로 1부터 10까지 10개의 값이 send되고 이를 5개의 코루틴이 나누어서 수신받는 것을 확인할 수 있다.

2. Fan-in

Fan-out과 반대로 Fan-in은 데이터를 생산하는 코루틴이 여러 개이고, 이를 하나의 채널로 스트리밍되는 구조이다.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

suspend fun produceNumbers(channel: SendChannel<Int>, from: Int, interval: Long) {
    var x = from
    while (true) {
        channel.send(x)
        x += 2
        delay(interval)
    }
}

fun CoroutineScope.processNumber(channel: ReceiveChannel<Int>) = launch {
    channel.consumeEach {
        println("received ${it}")
    }
}


fun main() = runBlocking<Unit> {
    val channel = Channel<Int>()
    launch {
        produceNumbers(channel, 1, 100L)
    }
    launch {
        produceNumbers(channel, 2, 150L)
    }
    processNumber(channel)
    delay(1000L)
    coroutineContext.cancelChildren()
}

출력 결과

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
received 1
received 2
received 3
received 4
received 5
received 6
received 7
received 9
received 8
received 11
received 10
received 13
received 15
received 12
received 17
received 14
received 19

produceNumbers() 함수를 두 개의 launch 블록에서 각각 호출하고 있다.

주어지는 파라미터에 따라

1, 3, 5, 7, 9 … (per 100ms)
2, 4, 6, 8, 10 … (per 150ms)

와 같이 값을 생산하여 마찬가지로 파라미터로 주어진 채널로 send한다.

즉 하나의 채널에 두 개의 코루틴이 값을 송신하고 있다.

이후 같은 채널을 processNumber() 함수가 consumeEach로 값을 수신받아 출력한다.

3. 채널은 공평하다 (Channels are fair)

하나의 채널에 여러 개의 코루틴이 송수신 동작을 수행하는 경우

채널은 이를 선입선출로 처리한다.

아래 예제는 “ping”과 “pong”이라는 이름을 가진 두 개의 코루틴이 채널 table에서 Ball 객체를 공유하며 번갈아가면서 출력하는 것을 보여준다.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

data class Ball(var hits: Int)

suspend fun player(name: String, table: Channel<Ball>) {
    for (ball in table) { // receive the ball in a loop
        ball.hits++
        println("$name $ball")
        delay(300) // wait a bit
        table.send(ball) // send the ball back
    }
}

fun main() = runBlocking {
    val table = Channel<Ball>() // a shared table
    launch { player("ping", table) }
    launch { player("pong", table) }
    table.send(Ball(0)) // serve the ball
    delay(1000) // delay 1 second
    coroutineContext.cancelChildren() // game over, cancel them
}

출력 결과

1
2
3
4
ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)

4. select

여러 개의 코루틴이 데이터를 생산하는 경우, 가장 먼저 완료된 결과를 바로 처리해야하는 경우가 있다.

select 표현식은 가장 먼저 완료된 결과를 곧바로 이용할 수 있게 처리해주는 함수이다.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*

fun CoroutineScope.ping() = produce<String> {
    while (true) {
        delay(100L)
        send("ping")
    }
}

fun CoroutineScope.pong() = produce<String> {
    while (true) {
        delay(150L)
        send("pong")
    }
}

fun main() = runBlocking<Unit> {
    val ping = ping()
    val pong = pong()
    repeat (5) {
        select<Unit> {
            ping.onReceive { println("$it") }
            pong.onReceive { println("$it") }
        }
    }
    coroutineContext.cancelChildren()
}

출력 결과

1
2
3
4
5
ping
pong
ping
ping
pong

ping()은 0.1초마다 “ping”을, pong()은 0.15초마다 “pong”을 send 해준다.

타임라인을 그려보면 아래와 같다.

0.1초 0.15초 0.2초 0.3초 0.3초
ping pong ping ping pong

물론 0.3초에 동시에 send 되는 경우 출력 결과가 아래와 같이 바뀔 수도 있다.

0.1초 0.15초 0.2초 0.3초 0.3초
ping pong ping pong ping

결론적으로 채널에 먼저 send된 녀석을 바로 처리하게 해주는 표현식이라고 볼 수 있다.

이와 같은 select 표현식은 아래와 같은 상황에서 적절하게 사용할 수 있다.

List of supported select methods

Receiver Suspending function Select clause
Job join onJoin
Deferred await onAwait
SendChannel send onSend
ReceiveChannel receive onReceive
ReceiveChannel receiveCatching onReceiveCatching
none delay onTimeout

참고 kotlinx-coroutines-core/kotlinx.coroutines.selects/select

채널 버퍼링

예제 91: 버퍼

이전에 만들었던 예제를 확장하여 버퍼를 지정해보자. Channel 생성자는 인자로 버퍼의 사이즈를 지정 받는다.

지정하지 않으면 버퍼를 생성하지 않는다.

import kotlinx.coroutines.* import kotlinx.coroutines.channels.*

fun main() = runBlocking<Unit> {
val channel = Channel<Int>(10)
launch {
for (x in 1..20) {
println(“${x} 전송중”)
channel.send(x)
}
channel.close()
}

for (x in channel) {
    println("${x} 수신")
    delay(100L)
}
println("완료")

}

채널에 인자로 10을 지정했다. 10개까지는 수신자가 받지 않아도 계속 전송한다.

예제 92: 랑데뷰

버퍼 사이즈를 랑데뷰(Channel.RENDEZVOUS)로 지정해봅시다.

import kotlinx.coroutines.* import kotlinx.coroutines.channels.*

fun main() = runBlocking<Unit> {
val channel = Channel<Int>(Channel.RENDEZVOUS)
launch {
for (x in 1..20) {
println(“${x} 전송중”)
channel.send(x)
}
channel.close()
}

for (x in channel) {
    println("${x} 수신")
    delay(100L)
}
println("완료")

}

랑데뷰는 버퍼 사이즈를 0으로 지정하는 것입니다. 생성자에 사이즈를 전달하지 않으면 랑데뷰가 디폴트입니다.

이외에도 사이즈 대신 사용할 수 있는 다른 설정 값이 있습니다.

  • UNLIMITED - 무제한으로 설정
  • CONFLATED - 오래된 값이 지워짐.
  • BUFFERED - 64개의 버퍼. 오버플로우엔 suspend

예제 93: 버퍼 오버플로우

버퍼의 오버플로우 정책에 따라 다른 결과가 나올 수 있습니다.

import kotlinx.coroutines.* import kotlinx.coroutines.channels.*

fun main() = runBlocking<Unit> {
val channel = Channel<Int>(2, BufferOverflow.DROP_OLDEST)
launch {
for (x in 1..50) {
channel.send(x)
}
channel.close()
}

delay(500L)

for (x in channel) {
    println("${x} 수신")
    delay(100L)
}
println("완료")

}

  • SUSPEND - 잠이 들었다 깨어납니다.
  • DROP_OLDEST - 예전 데이터를 지웁니다.
  • DROP_LATEST - 새 데이터를 지웁니다.