(Kotlin Coroutines) 003. Job

Kotlin Coroutines - Job

이전 포스팅에서 다루었던 coroutineScope 예제를 다시 한 번보자.

import kotlinx.coroutines.*

suspend fun doSomething() = coroutineScope {
    launch {
        println("launch: ${Thread.currentThread().name}")
        delay(100L)
    }
    println("coroutineScope")
    println("coroutineScope: ${Thread.currentThread().name}")
}

fun main() = runBlocking {
    doSomething()
}

coroutineScope으로 처리하게 되면 반환값이 필요해지므로 println을 추가해 Unit 타입으로 만들어주었다.

만약 println이 없다면 무엇을 반환하고 있길래 오류가 발생하는걸까?

1. Job

아래와 같이 코드를 작성하여 반환 타입을 확인해보자.

import kotlinx.coroutines.*

suspend fun doSomething() = coroutineScope {
    launch {
        delay(100L)
    }
}

fun main() = runBlocking {
    val result = doSomething()
    println("return type : ${result::class.simpleName}")
}

출력 결과

1
2
launch: main @coroutine#2
StandaloneCoroutine

launch 블럭이 StandaloneCoroutine을 반환하고 있음을 확인했다.

StandaloneCoroutine의 구현체를 확인해보면 BlockingCoroutine과 마찬가지로 AbstractCoroutine이 슈퍼클래스임을 확인할 수 있다.

1
2
3
4
5
6
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
// ...
}

AbstractCoroutineJobSupport를 상속받고 있는데, JobSupport의 주석엔 아래와 같은 글귀가 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* A concrete implementation of [Job]. It is optionally a child to a parent job.
*
* This is an open class designed for extension by more specific classes that might augment the
* state and mare store addition state information for completed jobs, like their result values.
*
* @param active when `true` the job is created in _active_ state, when `false` in _new_ state. See [Job] for details.
* @suppress **This is unstable API and it is subject to change.**
*/
@Deprecated(level = DeprecationLevel.ERROR, message = "This is internal API and may be removed in the future releases")
public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 {
// ...
}

주석 제일 첫 문장을 주목해보자.

A concrete implementation of [Job], Job의 구체적인 구현체라고 적혀있다.

바로 위에서 확인했듯 launch 블록으로 작성된 코루틴 빌더가 반환하는 객체를 Job이라고 한다.

Job 객체를 이용하면 코루틴을 기다리게하거나 취소하게하는 등의 제어가 가능하다.

예제를 수정하여 파악해보자.

import kotlinx.coroutines.*

suspend fun doSomething() = coroutineScope {
    val job = launch {
        delay(1000L)
        println("launch 1")
    }
    job.join()
    launch {
        println("launch 2")
    }
    println("coroutineScope")
}

fun main() = runBlocking {
    doSomething()
}

출력 결과

1
2
3
launch 1
coroutineScope
launch 2

첫 번째 launch에서 반환한 Job 객체에 join을 걸었기때문에, 다른 코루틴에게 스레드를 양보하지않고 끝날때까지 점유한다.

이후 “coroutineScope”를 출력한 뒤 큐에 들어있는 두 번째 launch 블록을 수행하고 종료된다.

만약 join을 호출하지않았다면 아래와 같은 결과가 나오게 될 것이다.

출력 결과

1
2
3
coroutineScope
launch 2
launch 1

먼저 가장 외곽의 블록의 코드인 “coroutineScope”를 출력한 뒤,

첫 번째 launch에서 만난 delay로 1초간 스레드를 양보하여 두 번째 launch가 수행된 후, 다시 재개하여 “launch 1”을 출력하고 종료되었다.

2. cancel

코루틴 빌더가 반환하는 Job 객체를 이용하면 코루틴을 취소할 수 있다.

아래 예제를 보자.

import kotlinx.coroutines.*

suspend fun doSomething() = coroutineScope {
    val job1 = launch {
        println("launch1 : Start")
        delay(1000L)
        println("launch1 : Finish")
    }

    val job2 = launch {
        println("launch2 : Start")
        println("launch2 : Finish")
    }

    val job3 = launch {
        println("launch3 : Start")
        delay(500L)
        println("launch3 : Finish")
    }

    delay(800L)
    job1.cancel()
    job2.cancel()
    job3.cancel()
    println("coroutineScope : Finish")
}

fun main() = runBlocking {
    doSomething()
}

출력 결과

1
2
3
4
5
6
launch1 : Start
launch2 : Start
launch2 : Finish
launch3 : Start
launch3 : Finish
coroutineScope : Finish

최초에 delay(800L)을 호출한 뒤 모든 job 객체들을 취소하고 있다.

0.8초만 지연된 후 종료되때문에 지연없이 수행이 완료되는 job2와 0.5초의 지연이라서 0.8초 안에 들어올 것이라 예상되는 job3도 수행이 완료될 것이다.

다만 1초의 지연시간을 가진 job1은 Finish를 출력하지못하고 취소되었다.

3. Irrevocability Job

모든 Job 객체가 취소가 가능한 것은 아니다.

아래 코드는 취소가 불가능한 Job의 예제 코드이다.

import kotlinx.coroutines.*

suspend fun counter() = coroutineScope {
    val job1 = launch(Dispatchers.Default) {
        var i = 1
        var nextTime = System.currentTimeMillis()

        while (i in 1 .. 10) {
            val currentTime = System.currentTimeMillis()
            if (currentTime >= nextTime) {
                println(i)
                nextTime = currentTime + 100L
                i++
            }
        }
    }
    
    delay(200L)
    job1.cancel()
    println("finish")
}

fun main() = runBlocking {
    counter()
}

출력 결과

1
2
3
4
5
6
7
8
9
10
11
1
2
3
finish
4
5
6
7
8
9
10

job1.cancel()로 작업을 취소하려고 했지만 “finish”가 출력된 이후로도 계속해서 작업이 수행되었음을 확인할 수 있다.

이유는 launch 블록을 만들때 정의된 Dispatchers.Default 때문이다.

Dispatchers.Default는 적용된 코루틴의 블럭을 JVM의 공유 스레드풀에 존재하는 별도의 스레드에서 수행시킨다.

때문에 작업의 취소라는 개념 자체가 없어 취소할 수 없는 것이다.

모든 코루틴 작업을 취소하기 위해선 별도의 작업이 필요하다.

4. join after cancel

위의 예제를 다시 보자.

import kotlinx.coroutines.*

suspend fun counter() = coroutineScope {
    val job1 = launch(Dispatchers.Default) {
        var i = 1
        var nextTime = System.currentTimeMillis()

        while (i in 1 .. 10) {
            val currentTime = System.currentTimeMillis()
            if (currentTime >= nextTime) {
                println(i)
                nextTime = currentTime + 100L
                i++
            }
        }
    }
    
    delay(200L)
    job1.cancel()
    println("finish")
}

fun main() = runBlocking {
    counter()
}

작업이 취소되었든 무사히 완료되었든 “finish”를 출력하고 싶다면 어떻게 해야할까?

작업이 완료되기를 기다리는 join을 써서 해결할 수 있다.

아래와 같이 cancel 다음에 join을 추가해보자.

import kotlinx.coroutines.*

suspend fun counter() = coroutineScope {
    val job1 = launch(Dispatchers.Default) {
        var i = 1
        var nextTime = System.currentTimeMillis()

        while (i in 1 .. 10) {
            val currentTime = System.currentTimeMillis()
            if (currentTime >= nextTime) {
                println(i)
                nextTime = currentTime + 100L
                i++
            }
        }
    }
    
    delay(200L)
    job1.cancel()
    job1.join()
    println("finish")
}

fun main() = runBlocking {
    counter()
}

출력 결과

1
2
3
4
5
6
7
8
9
10
11
1
2
3
4
5
6
7
8
9
10
finish

실행해보면 모든 작업이 완료된 후 “finish”가 출력되었음을 확인할 수 있다.

5. cancelAndJoin

코루틴에서 cancel을 호출하고 join을 호출하는 방식은 매우 흔한 일이다.

그래서 코루틴은 두 가지 작업을 한 번에 처리할 수 있는 함수를 제공한다.

1
2
3
4
5
6
// AS-IS
job.cancel()
job.join()

// TO-BE
job.cancelAndJoin()
import kotlinx.coroutines.*

suspend fun counter() = coroutineScope {
    val job1 = launch(Dispatchers.Default) {
        var i = 1
        var nextTime = System.currentTimeMillis()

        while (i in 1 .. 10) {
            val currentTime = System.currentTimeMillis()
            if (currentTime >= nextTime) {
                println(i)
                nextTime = currentTime + 100L
                i++
            }
        }
    }
    
    delay(200L)
    // job1.cancel()
    // job1.join()
    job1.cancelAndJoin() // HERE
    println("finish")
}

fun main() = runBlocking {
    counter()
}

출력 결과

1
2
3
4
5
6
7
8
9
10
11
1
2
3
4
5
6
7
8
9
10
finish

따로 호출한 것과 한 번에 호출한 것의 결과가 동일한 것을 확인할 수 있다.

6. Cancelable coroutines

코루틴이 별도의 스레드에서 수행중이라고 해서 취소를 못하는 경우보다

코루틴의 모든 제어권을 가지고 있는 것이 더욱 안전하다.

이때 isActive 속성을 확인하여 현재 코루틴의 활성화 여부를 판정할 수 있다.

기존 예제에 isActive를 추가해보자.

import kotlinx.coroutines.*

suspend fun counter() = coroutineScope {
    val job1 = launch(Dispatchers.Default) {
        var i = 1
        var nextTime = System.currentTimeMillis()

        while (i in 1 .. 10 && isActive) { // HERE
            val currentTime = System.currentTimeMillis()
            if (currentTime >= nextTime) {
                println(i)
                nextTime = currentTime + 100L
                i++
            }
        }
    }
    
    delay(200L)
    job1.cancelAndJoin()š
    println("finish")
}

fun main() = runBlocking {
    counter()
}

출력 결과

1
2
3
1
2
finish

코루틴의 활성화 여부를 이용하여 간단하게 처리할 수 있음을 확인하였다.

7. try-catch-finally

단순한 연산이라면 상관없겠지만 만약 코루틴 내에서 자원을 할당 받은 경우엔 후처리가 필요하다.

여기서 자원이란 소켓 통신이나 파일의 입출력, 데이터베이스 커넥션 등 외부 자원을 활용하는 경우를 뜻한다.

만약 launch에서 자원을 할당 받았다면 이를 해제해주는 지점이 있어야할 것이다.

이때 finally를 사용할 수 있다.

import kotlinx.coroutines.*

suspend fun doSomething() = coroutineScope {
    val job1 = launch(Dispatchers.Default) {
        try {
            println("launch1 : ${Thread.currentThread().name}")
            delay(1000L)
            println("launch1 : finish")
        } finally {
            println("launch1 : finally")
        }
    }

    val job2 = launch(Dispatchers.Default) {
        try {
            println("launch2 : ${Thread.currentThread().name}")
            delay(500L)
            println("launch2 : finish")
        } finally {
            println("launch2  : finally")
        }
    }

    delay(200L)
    job1.cancel()
    job2.cancel()
}

fun main() = runBlocking {
    doSomething()
}

출력 결과

1
2
3
4
launch1 : DefaultDispatcher-worker-1 @coroutine#2
launch2 : DefaultDispatcher-worker-2 @coroutine#3
launch1 : finally
launch2 : finally

job1job2가 모두 취소되었지만, finally 블록 내의 코드는 호출된 것을 확인할 수 있다.

8. NonCancellable

어떤 코드는 정말 중요한 작업이라 절대 취소를 하지 말아야할 수 있다.

취소를 불가능하게 하려면 withContext(NonCancellable)을 이용해 작업하면 된다.

import kotlinx.coroutines.*

suspend fun doSomething() = coroutineScope {
    val job1 = launch(Dispatchers.Default) {
        withContext(NonCancellable) {
            println("launch1 : ${Thread.currentThread().name}")
            delay(1000L)
            println("launch1 : NonCancellable")
        }
        delay(1000L)
        println("launch1 : finish")
    }

    val job2 = launch(Dispatchers.Default) {
        withContext(NonCancellable) {
            println("launch2 : ${Thread.currentThread().name}")
            delay(1000L)
            println("launch2 : NonCancellable")
        }
        delay(1000L)
        println("launch2 : finish")
    }

    delay(200L)
    job1.cancel()
    job2.cancel()
}

fun main() = runBlocking {
    doSomething()
}

출력 결과

1
2
3
4
launch1 : DefaultDispatcher-worker-1 @coroutine#2
launch2 : DefaultDispatcher-worker-2 @coroutine#3
launch1 : NonCancellable
launch2 : NonCancellable

job1job2가 모두 취소되어 “finish”는 출력되지 않았지만, withContext(NonCancellable)이 적용된 블록은 모두 실행되었음을 확인할 수 있다.

finally와 결합시켜 무조건 취소되지않고 회수해야하는 자원이 있는 경우 안전하게 회수할 수 있다.

9. withTimeout

앞서 언급했듯 코루틴은 계층적 구조로 모든 내부 동작이 끝날때까지 기다린다.

하지만 Api call처럼 일정 시간이 지난 후 종료처리를 하고 싶은 경우 withTimeout을 사용할 수 있다.

import kotlinx.coroutines.*

suspend fun doSomething() = coroutineScope {
    val job1 = launch(Dispatchers.Default) {
        println("launch1 : ${Thread.currentThread().name}")
        delay(1000L)
        println("launch1 : finish")
    }

    val job2 = launch(Dispatchers.Default) {
        println("launch2 : ${Thread.currentThread().name}")
        delay(1000L)
        println("launch2 : finish")
    }
}

fun main() = runBlocking {
    withTimeout(500L) {
        doSomething()
    }
}

출력 결과

1
2
3
4
5
6
7
8
9
10
launch1 : DefaultDispatcher-worker-2 @coroutine#2
launch2 : DefaultDispatcher-worker-1 @coroutine#3
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 500 ms
at (Coroutine boundary. (:-1)
at FileKt$main$1$1.invokeSuspend (File.kt:-1)
at FileKt$main$1.invokeSuspend (File.kt:-1)
Caused by: kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 500 ms
at kotlinx.coroutines.TimeoutKt .TimeoutCancellationException(Timeout.kt:184)
at kotlinx.coroutines.TimeoutCoroutine .run(Timeout.kt:154)
at kotlinx.coroutines.EventLoopImplBase$DelayedRunnableTask .run(EventLoop.common.kt:502)

0.5초 이후 코루틴이 타임아웃 처리되어 “finish”를 출력하지 못하였다.

이때 출력 화면과 같이 TimeoutCancellationException이 발생하게 된다.

10. withTimeoutOrNull

타임아웃시 발생한 TimeoutCancellationException을 처리하기 위해 try-catch를 계속 사용하는 것은 매우 귀찮은 일이다.

이를 간소화하기 위해 코루틴은 withTimeoutOrNull을 제공해준다.

withTimeoutOrNullTimeoutCancellationException 발생시 null을 반환해준다.

import kotlinx.coroutines.*

suspend fun doSomething() = coroutineScope {
    val job1 = launch(Dispatchers.Default) {
        println("launch1 : ${Thread.currentThread().name}")
        delay(1000L)
        println("launch1 : finish")
    }

    val job2 = launch(Dispatchers.Default) {
        println("launch2 : ${Thread.currentThread().name}")
        delay(1000L)
        println("launch2 : finish")
    }
}

fun main() = runBlocking {
    val result = withTimeoutOrNull(500L) {
        doSomething()
        true
    } ?: false
    println(result)
}

출력 결과

1
2
3
launch1 : DefaultDispatcher-worker-1 @coroutine#2
launch2 : DefaultDispatcher-worker-2 @coroutine#3
false

whithTimeoutOrNull을 통해 성공적으로 코루틴 작업을 마쳤으면 true, 중간에 타임아웃으로 인해 null이 발생했다면 엘비스 연산자를 이용해 false를 반환하도록 적용하였다.

타임아웃이 발생하게끔 지연시킨 코드이므로 의도대로 false를 출력하는 것을 확인할 수 있다.


References