(Kotlin Coroutines) 015. Channel pipelines

Kotlin Coroutines - Channel pipelines

이번 포스팅에서는 파이프라인을 이용해 두 개 이상의 채널을 연결하여 사용하는 법에 대해서 알아보자.

파이프라인 또한 앞서 다루었던 생산자&소비자 패턴처럼 일반적인 패턴이다.

참고 Java Design Patterns - Pipeline

1. pipeline

코루틴에서의 파이프라인은 하나의 스트림을 프로듀서가 만들고, 다른 코루틴에서 해당 스트림을 읽어내어 새로운 스트림을 만드는 패턴이다.

아래 예제처럼 CoroutineScope.{FUNCTION_NAME} 형식으로 함수를 만들어내면 함수의 코드 블럭내에서는 별도 코루틴 스코프의 명시 없이도 동일하게 처리할 수 있다.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) {
        send(x++) // infinite stream of integers starting from 1
    }
}

fun CoroutineScope.produceStringNumbers(numbers: ReceiveChannel<Int>): ReceiveChannel<String> = produce {
    for (i in numbers) {
        send("${i}!")
    }
}

fun main() = runBlocking<Unit> {
    val numbers = produceNumbers() // // produces integers from 1 and on
    val stringNumbers = produceStringNumbers(numbers)

    repeat(5) {
        println(stringNumbers.receive()) // // print first five
    }
    println("Done!") // we are done
    coroutineContext.cancelChildren() // cancel children coroutines
}

출력 결과

1
2
3
4
5
6
1!
2!
3!
4!
5!
Done!

코드 분석을 해보자.

produceNumbers()는 1부터 순차적으로 1씩 증가된 정수를 생산한다.

produceStringNumbers()는 입력된 채널로부터 스트리밍된 값을 ${i}!의 문자열로 출력한다.

여기서 쓰인 ReceiveChannel은 지정된 채널로부터 값을 넘겨 받는 다는 뜻이다.

반환 타입도 살펴보면 produce를 통해 새로운 스트림을 만들어내는 것을 확인할 수 있다.

특이점으로는 close가 없으므로 for 루프로 수신할 수 없기때문에 명시적인 순회 횟수를 가진 repeat를 쓴 것을 들 수 있다.

2. 파이프라인을 이용한 홀수 필터링(Odd numbers with pipeline)

파이프라인을 이용해 홀수만 출력하는 코드를 작성해보자.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) {
        send(x++)
    }
}

fun CoroutineScope.filterOdd(numbers: ReceiveChannel<Int>): ReceiveChannel<String> = produce {
    for (i in numbers) {
        if (i % 2 == 1) {
            send("${i}!")
        }
    }
}


fun main() = runBlocking<Unit> {
    val numbers = produceNumbers()
    val oddNumbers = filterOdd(numbers)

    repeat(10) {
        println(oddNumbers.receive())
    }
    println("Done!")
    coroutineContext.cancelChildren()
}

출력 결과

1
2
3
4
5
6
7
8
9
10
11
1!
3!
5!
7!
9!
11!
13!
15!
17!
19!
완료

produceNumbers()produce 함수가 만들어낸 ReceiveChannel을 반환한다.

이를 filterOdd()에 넘겨 새로운 ReceiveChannel을 만들어낸다.

3. 파이프라인을 이용한 소수 필터링(Prime numbers with pipeline)

이번엔 연속으로 파이프라인을 연결하여 소수(prime number)를 출력해보자.

이번에도 produce 함수를 두 개 사용하지만, repeat 안에서 receive를 호출하여 새로운 채널을 계속 만들어낼 것이다.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
    var x = start
    while (true) {
        send(x++)
    }
}

fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int): ReceiveChannel<Int> = produce {
    for (i in numbers) {
        if (i % prime != 0) {
            send(i)
        }
    }
}


fun main() = runBlocking<Unit> {
    var numbers = numbersFrom(2)

    repeat(10) {
        val prime = numbers.receive()
        println(prime)
        numbers = filter(numbers, prime)
    }
    println("Done!")
    coroutineContext.cancelChildren()
}

출력 결과

1
2
3
4
5
6
7
8
9
10
11
2
3
5
7
11
13
17
19
23
29
Done!

numbersFrom() 함수를 호출하면서 2를 파라미터로 넘겨준다.

따라서 2, 3, 4 순으로 데이터를 생산한다.

이후 repeat 안에서 receive 함수를 호출해 값을 하나씩 받아온다.

이후 filter()를 호출해 계속해서 채널을 생성해낸다.