728x90

kotlin 에서 멀티쓰레드 및 코루틴 환경에서 자원을 공유 하기위해서는 보통 잘알려진 방법으로 는 Mutex 로 context 의 Lock 을 걸어 자원을 읽고 쓰는 동안 그 자원에 대해서 접근하지 못하도록 하는 방법이 있다.

 

Mutex

private suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100 // 시작할 코루틴의 갯수
    val k = 1000 // 코루틴 내에서 반복할 횟수
    val elapsed = measureTimeMillis {
        coroutineScope { // scope for coroutines
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }

        }
    }
    println("$elapsed ms 동안 ${n * k}개의 액션을 수행했습니다.!")
}

//@Volatile // 가시성 문제만 해결할뿐 동시에 읽고 수정해서 쓰는 문제는 해결되지 않는다.
val mutex = Mutex()
private var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            mutex.withLock {
                counter++
            }
        }
    }
    println("Counter = $counter")
}

 

kotlin 에서는 코루틴을 사용하기 위해서는 함수 앞에 suspend 가 붙어야 한다. coroutineScope 를 활용해서 n * m 번 만큼 

count 를 증가시킨다.

100 * 1000 의 값을 출력하기 위해서 mutex 를 활용하여 counter++ 하는동안 자원을 lock 한다.

 

다음은 조금 생소한 개념인 Actor 를 활용하여 동일하게 counter 를 증가시킨다.

 

Actor

sealed class CounterMsg
object IncCounter : CounterMsg()
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()

fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0 // 액터 안에 상태를 캡슐화 해두고 다른 코로틴이 접근하지 못하도록

    for (msg in channel) { // 외부에서 보내는 것은 채널을 통해서만 받을 수 있따.
        when (msg) {
            is IncCounter -> counter++ // 증가시키는 신호.
            is GetCounter -> msg.response.complete(counter) //현재 상태를 반환
        }
    }
}

private suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100 // 시작할 코루틴의 갯수
    val k = 1000 // 코루틴 내에서 반복할 횟수
    val elapsed = measureTimeMillis {
        coroutineScope { // scope for coroutines
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }

        }
    }
    println("$elapsed ms 동안 ${n * k}개의 액션을 수행했습니다.!")
}

private var counter = 0

fun main() = runBlocking<Unit> {
    val counter = counterActor()
    withContext(Dispatchers.Default) {
        massiveRun {
            counter.send(IncCounter) // suspension point
        }
    }
    val response = CompletableDeferred<Int>()
    counter.send(GetCounter(response)) // suspension point
    println("Counter = ${response.await()}") // suspension point
    counter.close()
}

 

sealed class CounterMsg
object IncCounter : CounterMsg()
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()

fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0 // 액터 안에 상태를 캡슐화 해두고 다른 코로틴이 접근하지 못하도록

    for (msg in channel) { // 외부에서 보내는 것은 채널을 통해서만 받을 수 있따.
        when (msg) {
            is IncCounter -> counter++ // 증가시키는 신호.
            is GetCounter -> msg.response.complete(counter) //현재 상태를 반환
        }
    }
}

 

CompletableDeferred 란?

https://kotlinworld.com/149

 

[Coroutine] 10. Deferred를 이용한 결과값 수신

Deferred란 Deferred는 직역하면 연기라는 뜻을 가진다. "결과값 수신을 연기한다"라는 뜻인데, 이는 미래의 어느 시점에 결과값이 올 것을 뜻한다. Deferred의 의미와 같이 "Deferred는 결과값을 수신하는

kotlinworld.com

 

CompletableDefered<Int> 란 결과값을 Int형으로 수신하겠다는 의미다.

for (msg in channel) 로 메세지를 채널로 수신하고 있다가 메세지가 들어오면 when 절로 처리하는 것이다.

 

val counter = counterActor()
withContext(Dispatchers.Default) {
    massiveRun {
        counter.send(IncCounter) // suspension point
    }
}
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response)) // suspension point
println("Counter = ${response.await()}") // suspension point
counter.close()

 

counterActor 를 생성하여 직접적으로 counter 를 증가시키지 않고 send method를 활용하여 값을 증가시킨다.

send(IncCounter) 는 suspension point 이기 때문에 잠들어 있다가 로직이 다 수행되면 깨어난다.

 

counter.send(GetCounter(response)) 로 값을 수신하고 response.await() 로 값을 받아온다.

 

actor 좀더 자세하게 알아보기

https://origogi.github.io/coroutine/%EC%95%A1%ED%84%B0/

 

[Coroutine] 액터(Actor)

 

origogi.github.io

 

728x90