(Kotlin Coroutines) 014. Channel basics

Kotlin Coroutines - Channel basics

코루틴의 채널은 일종의 파이프로, 스트림 간에 데이터를 서로 주고받을 수 있다.

이 채널은 자바의 동시성 관련 패키지의 BlockingQueue와 매우 유사하며, put에 대응하는 send, take에 대응하는 receive 함수가 있는 만큼

기존에 동시성을 처리해본 사람이라면 금방 적응할 수 있을 것이다.

1. send

코루틴의 채널 객체를 생성하면 한쪽에서는 보내고(send), 한쪽에서는 받을 수(receive) 있게 된다.

아래는 정수를 송수신하는 채널의 예제이다.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*


fun main() = runBlocking<Unit> {
    val channel = Channel<Int>()
    launch {
        for (x in 1 .. 5) {
            channel.send(x)
        }
    }

    repeat(5) {
        println(channel.receive())
    }
    println("Done!")
}

출력 결과

1
2
3
4
5
6
1
2
3
4
5
DONE!

여기서 쓰인 함수인 sendreceive는 모두 suspension point이다.

따라서 데이터를 송신하려고 해도 수신할 대상이 없으면 유휴상태로 대기하며

데이터를 수신하려고 해도 수신할 데이터가 없으면 유휴상태로 대기한다.

이처럼 코루틴 채널은 송수신자간의 상호작용을 통해 효율적으로 동작을 수행한다.

trySendtryReceive를 대신 사용하여 예외에 대응할 수도 있지만, 단 suspension point로 지정되지않음에 유의해서 사용해야 한다.

참고 기존엔 null을 체크하기 위한 offerpoll 함수가 있었으나, 예외가 발생했을 때의 혼란을 주는 요소가 있어 v1.5부터 제거되었다.

2. 같은 코루틴에서의 채널 접근(Access from the same coroutine)

1의 예제를 조금 수정해보자.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*


fun main() = runBlocking<Unit> {
    val channel = Channel<Int>()
    launch {
        for (x in 1 .. 5) {
            channel.send(x)
        }

        repeat(5) {
            println(channel.receive())
        }
    }

    // repeat(5) {
    //     println(channel.receive())
    // }
    println("Done!")
}

출력 결과

1
Evaluation stopped while it's taking too long️

launch블록 안에서 채널에 대한 sendreceive를 둘 다 호출하면

둘 다 중단점을 가지고 의존적이기 때문에 무한 루프에 빠지게 된다.

3. close

채널이 더 이상 보낼 데이터가 없는 경우 close 함수로 채널을 닫을 수 있다.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking<Unit> {
    val channel = Channel<Int>()
    launch {
        for (x in 1 .. 5) {
            channel.send(x * x)
        }
        channel.close() // we're done sending
    }
    // here we print received values using `for` loop (until the channel is closed)
    for (y in channel) {
        println(y)
    }
    println("Done!")
}

출력 결과

1
2
3
4
5
6
1
4
9
16
25
Done!

send를 통해 보내는 데이터를 보내는 채널이 언제 닫힐 지모르니 for 등의 루프를 이용해서 수신하는 것이 편리하다.

위의 예제처럼 for를 사용하면 receive를 사용하지않아도 데이터를 받을수 있는 것을 확인할 수 있다.

close를 호출하면 채널에 close token을 보내는 것으로 가정할 수 있으며, 이 토큰을 받으면 for가 중지된다.

따라서 토큰이 수신되기전까지 모든 데이터가 수신되었음을 보장받을 수 있다.

4. 채널을 이용한 생산자 소비자 패턴(Building channel producers)

생산자와 소비자 패턴을 일반적인 디자인 패턴으로, 코루틴은 이를 편하게 사용하기 위한 함수들이 존재하다.

채널을 이용하여 한 쪽에서 데이터를 보내고(생산자), 한 쪽에서 데이터를 받는(소비자)를 구현해보자.

참고 Java Design Patterns - Producer Consumer

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*


fun main() = runBlocking<Unit> {
    val oneToTen = produce {
        for (x in 1..10) {
            channel.send(x)
        }
    }

    oneToTen.consumeEach {
        println(it)
    }
    println("Done!")
}

출력 결과

1
2
3
4
5
6
7
8
9
10
11
1
2
3
4
5
6
7
8
9
10
Done!

예제를 분석해보자.

produce 함수를 사용하여 코루틴을 만들고 채널을 제공하고 있음을 알 수 있고, consumeEach 함수를 사용하여 생산되어 채널로 송신된 데이터를 수신받아 출력하고 있음을 확인할 수 있다.

produce 함수의 선언부는 아래와 같다.

1
2
3
4
5
@ExperimentalCoroutinesApi
fun <E> CoroutineScope.produce(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0,
block: suspend ProducerScope<E>.() -> Unit): ReceiveChannel<E>

produce를 사용하면 ProducerScope를 상속받은 ProducerCoroutine을 얻게 된다.

ProducerScopeCoroutineScopeSendChannel 인터페이스를 상속받아 코루틴 컨텍스트와 몇몇 채널 인터페이스를 사용할 수 있는 스코프이다.

참고 Coroutines basics 포스팅 runBlockingAbstractCoroutine를 상속받은 BlockingCoroutine을 사용한다.