java.lang.IllegalStateException: Cannot send after the producer is closed
문제 상황
java.lang.IllegalStateException: Cannot send after the producer is closed
위 알람이 계속 배치에서 발생하고 있었고, 일시적인 현상이라고 생각했으나 불규칙한 주기로 발생하고 있어서 확인해보았다.
배치 로직에서 마지막에 kafka 토픽 발행하는 로직이 있었고 에러 설명처럼 kafka producer가 closed 상태임에도 발행 요청을
한게 원인이었다. 왜 closed 됐는지, closed된 producer에 어떻게 접근하게 됐는지 확인해볼 필요가 있었다.
원인은 생각보다 간단했다. 배치 로직 안에 비동기 작업이 포함됐기 때문이었다.
물론 비동기 작업이 포함됐다고 무조건 발생하지는 않는다. 아래에서 좀 더 자세히 설명한다.
배치
일반적인 서버는 (특수한 상황이 아니라면) 자주 종료되지 않는다. 즉 종료되는 상황이 정상적인 플로우가 아니다.
하지만 배치를 크론잡으로 돌리다보니 서버(파드)가 뜨면 배치 로직을 수행하고 완료되면 리소스를 정리하고 서버를 닫는다.
이런 특수성때문에 비동기 작업이 리소스 정리 전에 완료되지 않은 경우 에러가 발생하고 있었다.
리소스를 정리하면서 kafka producer 상태도 closed가 되고, 그런 producer에 접근해서 토픽 발행 요청을 하니 문제가 발생했다.
비동기 작업
동기로 꼭 돌아야하는 로직이 아니라면 비동기로 돌게 두어서 총 작업 시간을 단축시킬 수 있다.
에러가 발생하던 배치에서도 비동기로 도는 로직이 있었고, 이 작업이 지연된 경우 문제가 발생한 것이다.
// 작업 1 (동기)
// 작업 2 (동기)
// 비동기 작업 시작
// 작업 3 (동기)
- 작업1 -> 작업2 -> 작업3 순으로 수행
- 비동기 작업 시작은 작업2, 작업3 사이에 이뤄지지만 언제 종료될지 모른다
- 작업3 완료 후 크론잡 종료
비동기 작업은 대표적으로 2가지가 있다.
@Async
가 적용된 Listener로 이벤트 발행하는 방법과 코루틴
을 생성해 처리하는 방식이 있다.
해결 방법
일반 서버에서는 서버가 종료되기 전에 이미 작업중인 요청을 완료하는 graceful shutdown 설정을 해주면 편하다.
graceful shutdown을 유도하려면 TaskExecutor에 설정을 해주면 된다. -> baeldung 참고
비동기 작업이 @Async
가 적용된 Listener에서 발생한거라면 Async 설정에 정의해둔 TaskExecutor가 있을거라 거기에 똑같이
적용하면 된다.
하지만 나같은 경우 비동기 작업은 코루틴을 생성해서 처리하고 있었다. 그래서 TaskExecutor 설정을 해줄 수 없었다.
이 때는 사실 더 간단하다. join()
을 호출해주면 코루틴이 종료될 때까지 기다린다. (이후 로직은 코루틴이 종료된 뒤 실행된다)
val job = launch {
// 토픽 발행
}
job.join() // 코루틴이 종료될 때까지 대기
// 이후 로직 실행
위처럼 적용하니 안전하게 비동기 작업이 완료된 후 배치가 종료되어서 더 이상 에러가 발생하지 않았다.
댓글남기기