Kotlin Coroutines - Channel pipelines
이번 포스팅에서는 파이프라인을 이용해 두 개 이상의 채널을 연결하여 사용하는 법에 대해서 알아보자.
파이프라인 또한 앞서 다루었던 생산자&소비자 패턴처럼 일반적인 패턴이다.
1. pipeline
코루틴에서의 파이프라인은 하나의 스트림을 프로듀서가 만들고, 다른 코루틴에서 해당 스트림을 읽어내어 새로운 스트림을 만드는 패턴이다.
아래 예제처럼 CoroutineScope.{FUNCTION_NAME}
형식으로 함수를 만들어내면 함수의 코드 블럭내에서는 별도 코루틴 스코프의 명시 없이도 동일하게 처리할 수 있다.
import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun CoroutineScope.produceNumbers() = produce<Int> { var x = 1 while (true) { send(x++) // infinite stream of integers starting from 1 } } fun CoroutineScope.produceStringNumbers(numbers: ReceiveChannel<Int>): ReceiveChannel<String> = produce { for (i in numbers) { send("${i}!") } } fun main() = runBlocking<Unit> { val numbers = produceNumbers() // // produces integers from 1 and on val stringNumbers = produceStringNumbers(numbers) repeat(5) { println(stringNumbers.receive()) // // print first five } println("Done!") // we are done coroutineContext.cancelChildren() // cancel children coroutines }
출력 결과
1 | 1! |
코드 분석을 해보자.
produceNumbers()
는 1부터 순차적으로 1씩 증가된 정수를 생산한다.
produceStringNumbers()
는 입력된 채널로부터 스트리밍된 값을 ${i}!
의 문자열로 출력한다.
여기서 쓰인 ReceiveChannel
은 지정된 채널로부터 값을 넘겨 받는 다는 뜻이다.
반환 타입도 살펴보면 produce
를 통해 새로운 스트림을 만들어내는 것을 확인할 수 있다.
특이점으로는 close
가 없으므로 for
루프로 수신할 수 없기때문에 명시적인 순회 횟수를 가진 repeat
를 쓴 것을 들 수 있다.
2. 파이프라인을 이용한 홀수 필터링(Odd numbers with pipeline)
파이프라인을 이용해 홀수만 출력하는 코드를 작성해보자.
import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun CoroutineScope.produceNumbers() = produce<Int> { var x = 1 while (true) { send(x++) } } fun CoroutineScope.filterOdd(numbers: ReceiveChannel<Int>): ReceiveChannel<String> = produce { for (i in numbers) { if (i % 2 == 1) { send("${i}!") } } } fun main() = runBlocking<Unit> { val numbers = produceNumbers() val oddNumbers = filterOdd(numbers) repeat(10) { println(oddNumbers.receive()) } println("Done!") coroutineContext.cancelChildren() }
출력 결과
1 | 1! |
produceNumbers()
는 produce
함수가 만들어낸 ReceiveChannel
을 반환한다.
이를 filterOdd()
에 넘겨 새로운 ReceiveChannel
을 만들어낸다.
3. 파이프라인을 이용한 소수 필터링(Prime numbers with pipeline)
이번엔 연속으로 파이프라인을 연결하여 소수(prime number)를 출력해보자.
이번에도 produce
함수를 두 개 사용하지만, repeat
안에서 receive
를 호출하여 새로운 채널을 계속 만들어낼 것이다.
import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun CoroutineScope.numbersFrom(start: Int) = produce<Int> { var x = start while (true) { send(x++) } } fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int): ReceiveChannel<Int> = produce { for (i in numbers) { if (i % prime != 0) { send(i) } } } fun main() = runBlocking<Unit> { var numbers = numbersFrom(2) repeat(10) { val prime = numbers.receive() println(prime) numbers = filter(numbers, prime) } println("Done!") coroutineContext.cancelChildren() }
출력 결과
1 | 2 |
numbersFrom()
함수를 호출하면서 2를 파라미터로 넘겨준다.
따라서 2, 3, 4 순으로 데이터를 생산한다.
이후 repeat
안에서 receive
함수를 호출해 값을 하나씩 받아온다.
이후 filter()
를 호출해 계속해서 채널을 생성해낸다.