(Kotlin Coroutines) 017. Buffered channels

Kotlin Coroutines - Buffered channels

앞서 언급했듯, 채널은 기본적으로 중단점을 가지고 있다.

이때 버퍼링을 사용하면 유휴상태와 상관없이 채널을 사용할 수 있다.

지금까지의 예제중에 버퍼를 쓴적이 없듯이 채널에서 버퍼를 사용하려면 명시적으로 생성해주어야 한다.

1. buffer

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking<Unit> {

    val channel = Channel<Int>(5) // create buffered channel
    launch {
        for (x in 1..10) {
            println("Sending ${x}") // print before sending each element
            channel.send(x) // will suspend when buffer is full
        }
        channel.close()
    }

    for (x in channel) {
        println("Received ${x}")
        delay(100L)
    }
    println("Done!")
}

출력 결과

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Sending 1
Sending 2
Sending 3
Sending 4
Sending 5
Sending 6
Sending 7
Received 1
Received 2
Sending 8
Received 3
Sending 9
Received 4
Sending 10
Received 5
Received 6
Received 7
Received 8
Received 9
Received 10
Done!

채널을 초기화하면서 버퍼 사이즈를 부여하면 수신자가 있든 없든 채널로 계속 송신한다.

예제에서는 버퍼 사이즈를 5로 설정했기 때문에

출력 결과에서 숫자 5를 send하기 전까진 receive가 없어도 연달아 송신하는 것을 알 수 있다.

2. 랑데뷰(rendezvous)

1과 동일한 예제의 버퍼 사이즈를 5에서 랑데뷰로 변경해보자.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking<Unit> {

    val channel = Channel<Int>(Channel.RENDEZVOUS) // HERE
    launch {
        for (x in 1..10) {
            println("Sending ${x}") // print before sending each element
            channel.send(x) // will suspend when buffer is full
        }
        channel.close()
    }

    for (x in channel) {
        println("Received ${x}")
        delay(100L)
    }
    println("Done!")
}

출력 결과

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Sending 1
Sending 2
Received 1
Received 2
Sending 3
Received 3
Sending 4
Received 4
Sending 5
Received 5
Sending 6
Received 6
Sending 7
Received 7
Sending 8
Received 8
Sending 9
Received 9
Sending 10
Received 10
Done!

버퍼 사이즈를 랑데뷰로 지정하면 버퍼 사이즈를 0으로 지정하는 것과 같다.

즉 랑데뷰는 버퍼 사이즈를 명시하지않은것과 동일한 것이다.

이외에도 아래와 같은 사이즈 지정자들이 존재한다.

  • Channel.UNLIMITED : 버퍼 사이즈를 무제한으로 설정한다. 물론 메모리 부족시 RuntimeError가 발생한다.
  • Channel.CONFLATED : 버퍼에서 처리하지못한 오래된 값을 버리고, 새로운 값으로 채운다.
  • Channel.BUFFERED : 버퍼 사이즈를 64로 설정한다. 65개부터는 suspend 처리된다.

3. Buffer Overflow

지금까지 알아보았듯 코루틴 플로우를 사용하여 스트림을 생성하고,

이를 이용해 데이터를 처리하는 것은 효율적이라는 것은 인지하였다.

하지만 데이터의 생산 속도보다 소비 속도가 느린 경우 필연적으로 지연을 발생할수 밖에 없다.

특히 flowcollect는 하나의 코루틴에서 생산과 소비가 동시에 일어나고,

소비가 끝난 다음에 생산이 일어나기에 더욱 심한 지연이 발생하게 될 것이다.

이처럼 지연을 줄이기 위해 생산과 소비를 분리하는 것은 buffer를 통해 구현할 수 있다.

자 버퍼의 도입 목적을 다시 상기했다면 이번엔 버퍼가 넘치는 케이스에 어떻게 대응할지도 고민해보아야 한다.

이때 정해진 사이즈의 버퍼가 넘치는 것을 버퍼 오버플로우(Buffer Overflow) 라고 부른다.

먼저 예제를 보자.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking<Unit> {

    val channel = Channel<Int>(5, BufferOverflow.DROP_OLDEST) // HERE
    launch {
        for (x in 1..30) {
            println("Sending ${x}") // print before sending each element
            channel.send(x) // will suspend when buffer is full
        }
        channel.close()
    }

    delay(500L)

    for (x in channel) {
        println("Received ${x}")
        delay(100L)
    }
    println("Done!")
}

출력 결과

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
Sending 1
Sending 2
Sending 3
Sending 4
Sending 5
Sending 6
Sending 7
Sending 8
Sending 9
Sending 10
Sending 11
Sending 12
Sending 13
Sending 14
Sending 15
Sending 16
Sending 17
Sending 18
Sending 19
Sending 20
Sending 21
Sending 22
Sending 23
Sending 24
Sending 25
Sending 26
Sending 27
Sending 28
Sending 29
Sending 30
Received 26
Received 27
Received 28
Received 29
Received 30
Done!

사이즈가 5인 버퍼에 오버플로우 전략을 BufferOverflow.DROP_OLDEST으로 지정하였다.

버퍼가 초과되면 오래된 값을 전부 버리기때문에 지연시간 동안 생산된 1부터 25까지의 값이 모두 드랍된 것을 확인할 수 있다.

코루틴의 버퍼 오버플로우 전략은 3가지이다.

  • BufferOverflow.DROP_OLDEST : 오래된 데이터를 버리고 새로운 데이터를 채운다.
  • BufferOverflow.DROP_LATEST : 버퍼가 가득 차면 새로운 데이터를 버린다.
  • BufferOverflow.SUSPEND : 유휴상태에 빠졌다가 다시 깨어난다.

만약 예제 코드를 각각의 오버플로우 전략에 맞추어 출력한다면 아래와 같이 출력될 것이다.

  • BufferOverflow.DROP_OLDEST : 26 ~ 30까지 출력
  • BufferOverflow.DROP_LATEST : 1 ~ 5 까지 출력
  • BufferOverflow.SUSPEND : 1 ~ 30까지 출력

참고 kotlinx-coroutines-core/kotlinx.coroutines.channels/BufferOverflow