2 분 소요

kotlin을 활용한 프로젝트라면 일부 작업을 코루틴으로 처리하게끔 작성할 일이 있을 것이다.
내가 접한 케이스는 loop를 돌 때마다 코루틴을 생성하면서 비동기로 처리하는 로직이었는데 이 루프 카운트가 많아지면서 너무 많은
코루틴이 생성되는 문제가 예상되는 환경이었다.

이를 개선하기 위해 동작 가능한 최대 코루틴 개수를 정해줄 필요가 있었고, 세마포어를 적용해 개선하기로 했다!
세마포어가 뭔지 알아보고, 적용할 수 있는 코드는 어떤게 있을지 궁금해서 찾아본 뒤 작성하게 됐다!

개념

Semaphore

세마포어는 여러 프로세스 또는 스레드가 임계 영역(critical section)에 동시에 접근할 수 있도록 하는 방식이다.
편의상 스레드라고 부르면, 스레드는 임계 영역에 접근하기 위해 key를 얻어야 한다. 세마포어는 이 key 개수를 사전에 설정할 수 있으며, 만약 key 개수가 3개라면 동시에 3개의 스레드까지 접근해 비동기로 동작할 수 있는 것이다. 아래 사진을 참고하자.

가운데 블럭이 세마포어고 오른쪽 블럭이 실행되는 영역이라고 하면,
스레드 A~E 총 5개가 동시에 요청이 왔을 때 세마포어는 3개 스레드까지만 받을 수 있어서 일단 B, C, E를 받는다.

?


B, C, E는 실행하도록 하고, 나머지 스레드 A, D는 세마포어가 이미 3개 스레드(B, C, E)를 관리하고 있으므로 key를 받을 때 까지 대기한다. 이때 대기하는 스레드 A, D가 실행중인 스레드 B, C, E를 강제로 종료하는 등의 동작은 할 수 없다.

?


스레드 B, C, E는 동작을 완료하면 사용한 key를 세마포어에 넘긴다. 세마포어는 새로운 스레드를 실행하기 위해 대기중인 스레드를 조회하고 있다면 실행한다.

?


이런 방식으로 동작하게 되는데, 참고로 kotlin 세마포어는 프로세스, 스레드뿐만 아니라 코루틴도 관리한다!
그리고 위에서 3개라고 예를 들었던 세마포에에서 동작 가능한 스레드 최대 개수를 permits라 부른다.
kotlin에서는 이렇게 초기화할 수 있다!

val semaphore = Semaphore(3)


Mutex

Semaphore with permits = 1 is essentially a Mutex.

세마포어가 여러 스레드를 임계 영역에서 동작할 수 있도록 돕는다면, 뮤텍스는 단 1개의 스레드만 동작하게한다.
세마포어에서 permits1로 설정한 것이라 생각하면 된다.

적용하기

세마포어에서 key를 얻을 땐 acquire()를, 원하는 동작을 다 마쳐서 key를 반환하고 싶을 땐 release()를 사용한다.

fun semaphoreTest() {
    val semaphore = Semaphore(3)
    semaphore.acquire() // key 획득
    // 하고싶은 동작
    semaphore.release() // key 반환
}

위처럼 3개 코루틴까지 동작할 수 있는데, 요청이 한번에 8개가 왔다고 해보자! 로그를 남겨보면 어떻게 될까?

suspend fun main() {
    val semaphore = Semaphore(3)
    val jobs = mutableListOf<Job>()

    repeat(8) { 
        CoroutineScope(Dispatchers.IO).launch {
            semaphore.acquire() // key 획득

            println("코루틴 ${Thread.currentThread().name} 시작")
            Thread.sleep(2000)
            println("코루틴 ${Thread.currentThread().name} 끝")

            semaphore.release() // key 반환
        }.also {
            jobs.add(it)
        }
    }

    jobs.forEach { it.join() }
}


결과는 아래와 같다.
코루틴 3, 1, 2로 시작했다가 코루틴 2가 끝나고, 한 자리가 비어서 대기하던 코루틴 6이 시작된다.
항상 설정해준 permits = 3을 넘지 않게 최대 3개의 코루틴만 비동기로 동작하게 된다.

코루틴 DefaultDispatcher-worker-3 시작
코루틴 DefaultDispatcher-worker-1 시작
코루틴 DefaultDispatcher-worker-2 시작
코루틴 DefaultDispatcher-worker-2 끝
코루틴 DefaultDispatcher-worker-6 시작
코루틴 DefaultDispatcher-worker-3 끝
코루틴 DefaultDispatcher-worker-5 시작
코루틴 DefaultDispatcher-worker-1 끝
코루틴 DefaultDispatcher-worker-4 시작
코루틴 DefaultDispatcher-worker-6 끝
코루틴 DefaultDispatcher-worker-10 시작
...


참고로 Semaphore.withPermit을 쓰면 acquire(), release()를 직접 쓰지 않아도 된다.

semaphore.withPermit {
    println("코루틴 ${Thread.currentThread().name} 시작")
    Thread.sleep(2000)
    println("코루틴 ${Thread.currentThread().name} 끝")
}

내부적으로 try에서 메인 로직 수행하고, finally에 release()를 호출해서 key를 반환해주고 있기 때문이다!

// Semaphore.kt
public suspend inline fun <T> Semaphore.withPermit(action: () -> T): T {
    contract {
        callsInPlace(action, InvocationKind.EXACTLY_ONCE)
    }

    acquire()
    try {
        return action() // 메인 로직 호출
    } finally {
        release()
    }
}

References

카테고리:

업데이트:

댓글남기기