kotlin coroutine - Mutex 와 Actor (공유자원)
kotlin 에서 멀티쓰레드 및 코루틴 환경에서 자원을 공유 하기위해서는 보통 잘알려진 방법으로 는 Mutex 로 context 의 Lock 을 걸어 자원을 읽고 쓰는 동안 그 자원에 대해서 접근하지 못하도록 하는 방법이 있다.
Mutex
private suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 시작할 코루틴의 갯수
val k = 1000 // 코루틴 내에서 반복할 횟수
val elapsed = measureTimeMillis {
coroutineScope { // scope for coroutines
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("$elapsed ms 동안 ${n * k}개의 액션을 수행했습니다.!")
}
//@Volatile // 가시성 문제만 해결할뿐 동시에 읽고 수정해서 쓰는 문제는 해결되지 않는다.
val mutex = Mutex()
private var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
mutex.withLock {
counter++
}
}
}
println("Counter = $counter")
}
kotlin 에서는 코루틴을 사용하기 위해서는 함수 앞에 suspend 가 붙어야 한다. coroutineScope 를 활용해서 n * m 번 만큼
count 를 증가시킨다.
100 * 1000 의 값을 출력하기 위해서 mutex 를 활용하여 counter++ 하는동안 자원을 lock 한다.
다음은 조금 생소한 개념인 Actor 를 활용하여 동일하게 counter 를 증가시킨다.
Actor
sealed class CounterMsg
object IncCounter : CounterMsg()
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0 // 액터 안에 상태를 캡슐화 해두고 다른 코로틴이 접근하지 못하도록
for (msg in channel) { // 외부에서 보내는 것은 채널을 통해서만 받을 수 있따.
when (msg) {
is IncCounter -> counter++ // 증가시키는 신호.
is GetCounter -> msg.response.complete(counter) //현재 상태를 반환
}
}
}
private suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 시작할 코루틴의 갯수
val k = 1000 // 코루틴 내에서 반복할 횟수
val elapsed = measureTimeMillis {
coroutineScope { // scope for coroutines
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("$elapsed ms 동안 ${n * k}개의 액션을 수행했습니다.!")
}
private var counter = 0
fun main() = runBlocking<Unit> {
val counter = counterActor()
withContext(Dispatchers.Default) {
massiveRun {
counter.send(IncCounter) // suspension point
}
}
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response)) // suspension point
println("Counter = ${response.await()}") // suspension point
counter.close()
}
sealed class CounterMsg
object IncCounter : CounterMsg()
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0 // 액터 안에 상태를 캡슐화 해두고 다른 코로틴이 접근하지 못하도록
for (msg in channel) { // 외부에서 보내는 것은 채널을 통해서만 받을 수 있따.
when (msg) {
is IncCounter -> counter++ // 증가시키는 신호.
is GetCounter -> msg.response.complete(counter) //현재 상태를 반환
}
}
}
CompletableDeferred 란?
CompletableDefered<Int> 란 결과값을 Int형으로 수신하겠다는 의미다.
for (msg in channel) 로 메세지를 채널로 수신하고 있다가 메세지가 들어오면 when 절로 처리하는 것이다.
val counter = counterActor()
withContext(Dispatchers.Default) {
massiveRun {
counter.send(IncCounter) // suspension point
}
}
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response)) // suspension point
println("Counter = ${response.await()}") // suspension point
counter.close()
counterActor 를 생성하여 직접적으로 counter 를 증가시키지 않고 send method를 활용하여 값을 증가시킨다.
send(IncCounter) 는 suspension point 이기 때문에 잠들어 있다가 로직이 다 수행되면 깨어난다.
counter.send(GetCounter(response)) 로 값을 수신하고 response.await() 로 값을 받아온다.
actor 좀더 자세하게 알아보기
https://origogi.github.io/coroutine/%EC%95%A1%ED%84%B0/
'Kotlin' 카테고리의 다른 글
How to use Redisson with kotlin (0) | 2022.10.02 |
---|---|
How to set up Security in WebFlux (0) | 2022.10.01 |
spring 에서 비동기 처리를 위한 TreadPoolTaskExecutor 알아보기 (0) | 2022.08.24 |
Kotlin - with 과 apply (0) | 2022.07.24 |
댓글
이 글 공유하기
다른 글
-
How to use Redisson with kotlin
How to use Redisson with kotlin
2022.10.02 -
How to set up Security in WebFlux
How to set up Security in WebFlux
2022.10.01 -
spring 에서 비동기 처리를 위한 TreadPoolTaskExecutor 알아보기
spring 에서 비동기 처리를 위한 TreadPoolTaskExecutor 알아보기
2022.08.24 -
Kotlin - with 과 apply
Kotlin - with 과 apply
2022.07.24