[Spring] @Async를 이용해 Listener에서 비동기로 동작하는 로직 작성하기
@EventListener
를 사용하든, @TransactionalEventListener
를 사용하든 동기로 동작하게 된다.
만약 반드시 동기로 동작할 필요가 없는 로직이라면 굳이 동기로 동작하게 두면서 느리게 처리할 필요가 없지 않은가?
이런 점을 해결할 수 있게 Spring에서는 @Async라는 애노테이션을 제공한다. 활용해보고 실제로 빠르게 동작하는지 확인해보자.
여기서 동기로 동작할 필요가 없는 로직이라는 건 특정 로직을 3번 호출한다고 가정했을 때,
1번 끝나고 2번 호출, 2번 끝나고 3번 호출
반드시 이런 방식이 아닌 (=서로의 작업이 서로에게 영향을 줄 수 있는 케이스)
1번, 2번, 3번 작업을 동시에 수행해도 서로의 동작에 영향을 주지 않는 상황을 말한다.
@Async
새 트랜잭션
@Transactional(propagation = Propagation.REQUIRES_NEW)
처럼 @Async
가 붙은 메서드를 호출하면 새로운 스레드
가 시작된다. 이렇게 생성된 스레드는 호출부의 트랜잭션과 생명주기가 달라지므로, 어느쪽에서 롤백을 하든 영향을 주지 않는다.
프록시
@Transactional
이 붙은 메서드 호출 시 Sping AOP에 의해 프록시 객체를 참조하게 되는 것처럼, @Async
역시 프록시 객체를
참조하게 된다. 그렇기 때문에 프록시 객체의 함수를 사용하고 싶다면 조심해야 할 부분도 같다.
- private 사용 금지
기존 클래스를 상속한 프록시 객체의 메서드를 참조하는 것이기 때문에 사실 부모 클래스의 메서드를 호출하는 것과 같다.
따라서 호출하려는 메서드의 접근 제한자가 private이면 자식 클래스 즉, 프록시 객체에서 호출해줄 수 없게된다.
- 같은 클래스에서 호출하면 안됨
같은 클래스에 속한 메서드 A, B가 있다고 해보자. B가 A를 호출하는 로직이라고 할 때, A가 프록시 객체의 메서드이길 기대한다면
(@Transactional
, @Async
가 적용된 케이스 등) 다음처럼 작성하면 안된다. 기대한대로 동작하지 않는다.
@Service
class TestService {
@Async
fun A() {}
fun B() { A() } // this.A()와 같음
}
A()
가 this.A()
와 같아서 기대한대로 동작하지 않을 것이다.
this
는 프록시 객체가 아닌 단순히 TestService를 가리키기 때문이다!
그래서 A()
호출했을 때 프록시 객체의 메서드이길 바란다면 TestService
가 아닌 다른 클래스에서 호출하는 것도 방법이다.
여기서는 @Async
가 정상적으로 동작하지 않으니까, 동기로 동작한다는걸 예상할 수 있다.
Listener에 적용한다면
Listener에서는 위의 TestService
처럼 자가 호출(self-invocation)되도록 로직을 작성하지 않는다면 괜찮다.
덤으로 프록시로 잘 동작할 수 있도록 접근 제한자를 private으로 설정하지 않도록 해야한다!
// TestListener
@Async
@TransactionalEventListener(phase = AFTER_COMMIT)
fun handleEventWithAsyncTx(testData: TestData) {
log.info("[TxEventListener - AFTER_COMMIT] thread id : ${Thread.currentThread().id} request: ${testData.message}")
testRepository.save(TestEntity("test in listener"))
}
설정
@Async
를 사용할 거라고 Spring에게 미리 알려주는 설정이 필요하다.
간단하게 설정하는 방법은 @SpringBootApplication
이 있는 클래스에 @EnableAsync를 직접 적어주는 방법이 있다.
@Async
가 동작할 환경 설정도 세세하게 해줄 수 있는데, 이 케이스는 기본값인 SimpleAsyncTaskExecutor
를 사용한다.
하지만 이 설정은 해당 메서드를 호출할 때마다 기존 스레드를 재사용하지 않고 새로운 스레드를 시작한다는 문제점이 있다. (비추천)
@EnableAsync
@SpringBootApplication
class DemoApplication
fun main(args: Array<String>) {
runApplication<DemoApplication>(*args)
}
다른 방법으로, @Configuration
가 있는 설정 클래스에 @EnableAsync를 적용하고 아래처럼 설정해줄 수 있다.
커넥션 풀 설정을 세세히 할 수 있기때문에 SimpleAsyncTaskExecutor
보다 안전한 환경에서 동작할 수 있을 것이다.
@Configuration
@EnableAsync
class AsyncConfig {
@Bean
fun taskExecutor(): ThreadPoolTaskExecutor {
val executor = ThreadPoolTaskExecutor()
executor.setThreadNamePrefix("async-executor-")
executor.corePoolSize = POOL_SIZE
executor.maxPoolSize = POOL_SIZE * 2
executor.queueCapacity = QUEUE_SIZE
executor.setWaitForTasksToCompleteOnShutdown(true)
executor.initialize()
return executor
}
companion object {
private const val POOL_SIZE = 3
private const val QUEUE_SIZE = 3
}
}
corePoolSize
: 동작시킬 최소 스레드 수maxPoolSize
: 동작 가능한 최대 스레드 수queueCapacity
: 작업 큐에 보관 가능한 스레드 수setWaitForTasksToCompleteOnShutdown
: 작업이 완료될 때까지 기다리고 스레드 종료
다른 방법으로, application.yml
에 등록도 가능하다.
spring:
task:
execution:
pool:
core-size: 8
max-size: 16
테스트
@Async
Listener에 붙이기 이전에 @Async 자체만 잘 동착하는지 테스트해보자.
아래는 TestService
에 바동기로 동작하는 testService2.asyncMethod()
를 호출하는 testAsync()
가 있고,
동기로 동작하는 testService2.syncMethod()
를 호출하는 testSync()
가 있다. 각각 연속해서 8번 호출한다.
// TestService
fun testSync() {
log.info("** Sync test **")
val runningTime = measureTimedValue {
repeat(5) {
testService2.syncMethod()
}
}
log.info("running time : ${runningTime.value}")
}
fun testAsync() {
log.info("** Async test **")
val runningTime = measureTimedValue {
repeat(5) {
testService2.asyncMethod()
}
}
log.info("running time : ${runningTime.value}")
}
위에서 호출한다던 asyncMethod()
, syncMethod()
내용이다.
동기로 동작한다면 3초마다 로그가 찍힐 거고 비동기로 동작한다면 3초가 지났을 때 파바박 뜨는걸 예상할 수 있다!
// TestService2
fun syncMethod() {
sleep(3000)
log.info("call syncMethod - ${Thread.currentThread().name}")
}
@Async
fun asyncMethod() {
sleep(3000)
log.info("call asyncMethod - ${Thread.currentThread().name}")
}
@Async
설정은 위에 써놓은AsyncConfig
와 같음corePoolSize
= 3maxPoolSize
= 6queueCapacity
= 3
testSync()
를 호출하면 1개 스레드로 syncMethod()
를 3초씩 8번 연이어서 실행하기 때문에 약 24초의 시간이 걸린다.
testAsync()
를 호출하면 한번에 5개 스레드로 처리하는데 찍히는 시간은 1.57ms다.
사실 내부에서 asyncMethod()
가 1번 완전히 동작하는데만 3초 정도가 걸릴텐데 1.57ms
만에 8번 다 동작한다고 보기는 어렵고,
비동기 동작이라는게 호출한 함수가 결과가 나올 때까지(끝날 때까지) 기다리는게 아니고 바로 다음 동작으로 넘어가는 방식이기 때문에
8번 휙휙 지나가고 바로 로그를 찍어서 1.57ms
가 걸렸다고 보면 될 것 같다.
그래서 첫 함수가 종료되기 전에 running time 로그가 가장 먼저 찍힌걸 볼 수 있다!
3초 간격을 한 사이클이라고 하면 그래도 첫 사이클에 5개 스레드가 돌아서 5번 한번에 처리하고, 그 다음에 3개가 한번에 처리된다!
왜 maxPoolSize = 6
인데 한번에 6개가 안돌고 5개만 돌았는지 간단히 설명하면,
corePoolSize = 3
이므로 한번에 최소 3개까지는 돌게 되며 이후 들어오는 요청은 우선적으로 queue에 쌓이게 된다.
queueCapacity = 3
이므로 queue에 3개까지 쌓이고(들어가는 스레드는 대기 상태), 총 8번 호출했으니까 나머지 2번은 queue
에 더 저장할 수 없어서 queue에 가장 먼저 들어간 2개를 pop -> 실행한 뒤, 새 스레드 2개를 queue에 push한다.
그래서 한 사이클에 5개가 돌 수 있었던 거고, maxPoolSize = 6
이므로 최대 6개까지 돌 수 있다.
maxPoolSize
을 초과하는 만큼 요청이 들어오면 RejectedExecutionException
가 발생한다.
Listener에 @Async 걸어보기
@Async
걸기 전에, Listener에서 TestData를 기다리는 메서드가 2개 구현됐다고 해보자.
@EventListener
fun handleEventWithSync(testData: TestData) {
log.info("handleEventWithSync")
sleep(2000)
}
@EventListener
fun handleEventWithSync2(testData: TestData) {
log.info("handleEventWithSync2")
sleep(2000)
}
실행하면 아래처럼 동작한다.
handleEventWithSync()
호출 -> 2초 기다림 -> handleEventWithSync2()
호출
같이 실행되는게 아니고 순차적으로 실행되는 셈이다!
여기서 두 메서드에 @Async
를 걸면 거의 같이 호출된다!(=비동기로 동작) 매우 간단하게 적용할 수 있다.
@Async
@EventListener
fun handleEventWithSync(testData: TestData) {
log.info("handleEventWithAsync")
sleep(2000)
}
@Async
@EventListener
fun handleEventWithSync2(testData: TestData) {
log.info("handleEventWithAsync2")
sleep(2000)
}
실행해보면 순차적으로 호출되지 않고 동시에 로그를 출력하는걸 확인할 수 있다.
댓글남기기