[Kotlin] Semaphore 적용해보기
kotlin을 활용한 프로젝트라면 일부 작업을 코루틴으로 처리하게끔 작성할 일이 있을 것이다.
내가 접한 케이스는 loop를 돌 때마다 코루틴을 생성하면서 비동기로 처리하는 로직이었는데 이 루프 카운트가 많아지면서 너무 많은
코루틴이 생성되는 문제가 예상되는 환경이었다.
이를 개선하기 위해 동작 가능한 최대 코루틴 개수를 정해줄 필요가 있었고, 세마포어를 적용해 개선하기로 했다!
세마포어가 뭔지 알아보고, 적용할 수 있는 코드는 어떤게 있을지 궁금해서 찾아본 뒤 작성하게 됐다!
개념
Semaphore
세마포어는 여러 프로세스 또는 스레드가 임계 영역(critical section)에 동시에 접근할 수 있도록 하는 방식이다.
편의상 스레드라고 부르면, 스레드는 임계 영역에 접근하기 위해 key를 얻어야 한다. 세마포어는 이 key 개수를 사전에 설정할 수
있으며, 만약 key 개수가 3개
라면 동시에 3개의 스레드까지 접근해 비동기로 동작할 수 있는 것이다. 아래 사진을 참고하자.
가운데 블럭이 세마포어고 오른쪽 블럭이 실행되는 영역이라고 하면,
스레드 A~E 총 5개가 동시에 요청이 왔을 때 세마포어는 3개 스레드까지만 받을 수 있어서 일단 B, C, E를 받는다.
B, C, E는 실행하도록 하고, 나머지 스레드 A, D는 세마포어가 이미 3개 스레드(B, C, E)를 관리하고 있으므로 key를 받을 때
까지 대기한다. 이때 대기하는 스레드 A, D가 실행중인 스레드 B, C, E를 강제로 종료하는 등의 동작은 할 수 없다.
스레드 B, C, E는 동작을 완료하면 사용한 key를 세마포어에 넘긴다. 세마포어는 새로운 스레드를 실행하기 위해 대기중인 스레드를
조회하고 있다면 실행한다.
이런 방식으로 동작하게 되는데, 참고로 kotlin 세마포어는 프로세스, 스레드뿐만 아니라 코루틴도 관리한다!
그리고 위에서 3개라고 예를 들었던 세마포에에서 동작 가능한 스레드 최대 개수를 permits라 부른다.
kotlin에서는 이렇게 초기화할 수 있다!
val semaphore = Semaphore(3)
Mutex
Semaphore with permits = 1 is essentially a Mutex.
세마포어가 여러 스레드
를 임계 영역에서 동작할 수 있도록 돕는다면, 뮤텍스는 단 1개의 스레드만 동작하게한다.
세마포어에서 permits
를 1로 설정한 것이라 생각하면 된다.
적용하기
세마포어에서 key를 얻을 땐 acquire()를, 원하는 동작을 다 마쳐서 key를 반환하고 싶을 땐 release()를 사용한다.
fun semaphoreTest() {
val semaphore = Semaphore(3)
semaphore.acquire() // key 획득
// 하고싶은 동작
semaphore.release() // key 반환
}
위처럼 3개 코루틴까지 동작할 수 있는데, 요청이 한번에 8개가 왔다고 해보자! 로그를 남겨보면 어떻게 될까?
suspend fun main() {
val semaphore = Semaphore(3)
val jobs = mutableListOf<Job>()
repeat(8) {
CoroutineScope(Dispatchers.IO).launch {
semaphore.acquire() // key 획득
println("코루틴 ${Thread.currentThread().name} 시작")
Thread.sleep(2000)
println("코루틴 ${Thread.currentThread().name} 끝")
semaphore.release() // key 반환
}.also {
jobs.add(it)
}
}
jobs.forEach { it.join() }
}
결과는 아래와 같다.
코루틴 3, 1, 2
로 시작했다가 코루틴 2
가 끝나고, 한 자리가 비어서 대기하던 코루틴 6
이 시작된다.
항상 설정해준 permits = 3
을 넘지 않게 최대 3개의 코루틴만 비동기로 동작하게 된다.
코루틴 DefaultDispatcher-worker-3 시작
코루틴 DefaultDispatcher-worker-1 시작
코루틴 DefaultDispatcher-worker-2 시작
코루틴 DefaultDispatcher-worker-2 끝
코루틴 DefaultDispatcher-worker-6 시작
코루틴 DefaultDispatcher-worker-3 끝
코루틴 DefaultDispatcher-worker-5 시작
코루틴 DefaultDispatcher-worker-1 끝
코루틴 DefaultDispatcher-worker-4 시작
코루틴 DefaultDispatcher-worker-6 끝
코루틴 DefaultDispatcher-worker-10 시작
...
참고로 Semaphore.withPermit을 쓰면 acquire()
, release()
를 직접 쓰지 않아도 된다.
semaphore.withPermit {
println("코루틴 ${Thread.currentThread().name} 시작")
Thread.sleep(2000)
println("코루틴 ${Thread.currentThread().name} 끝")
}
내부적으로 try에서 메인 로직 수행하고, finally에 release()
를 호출해서 key를 반환해주고 있기 때문이다!
// Semaphore.kt
public suspend inline fun <T> Semaphore.withPermit(action: () -> T): T {
contract {
callsInPlace(action, InvocationKind.EXACTLY_ONCE)
}
acquire()
try {
return action() // 메인 로직 호출
} finally {
release()
}
}
댓글남기기