[Kafka] DLQ(Dead Letter Queue)
Kafka를 적용한 서비스를 운영중이라면 토픽 발행이 잘 되는지, 발행은 됐지만 컨슈머에서 처리가 안됐을 때는 어떻게 해야할지에 대해
고민해볼 필요가 있다. 유실돼도 괜찮은 거라면 중요도가 떨어지겠지만, 반드시 처리돼야 하는 케이스도 있기 때문이다.
TOSS SLASH 23 - 은행 최초 코어뱅킹 MSA 전환기 (feat. 지금 이자 받기)의 성능 개선을 위한 비동기 처리 에서 Kafka 메시지 큐 구성을 그림으로 보았을 때 데이터 정합성을 보장하기 위해 Dead Letter Queue(이하 DLQ)를 썼다는 점을 간단하게는 알 수 있었지만, 컨슈머에서 처리가 되지 못했을 때 retry를 적용했다는 점 외에는 자세히 써있지 않았다. 구체적으로 처리할 수 있는 방안이 뭐가 있을지 궁금해서 찾아보게 되었다.
DLQ(Dead Letter Queue)
Kafka, RabbitMQ 등의 기술은 message queue
를 사용한다. message queue에 메시지를 쌓아두면 pub-sub으로 처리를 하든,
컨슈머가 message queue에 있는 메시지를 가져가든, 아무튼 컨슈머에서 처리를 하게 된다. 그 과정에서 정상적으로 처리가 되지 못했을 때 재처리
용도로 DLQ를 사용할 수 있다. 최후의 보루로 두는 queue라고 생각하면 된다.
컨슈머 상태가 좋지 않아서 또는 로직 내부에서 외부 서버와 통신하는데 정상 응답을 받지 못해서 정상적으로 작업을 완료할 수 없을 때
사용하는게 일반적이다. 추후에는 언급한 문제들이 해소된 상황에서 처리될 수 있을거라는 기대가 있기 때문이다. 메시지에 있는 값이
컨슈머에서 처리할 수 없는 값일 때 개선 후 처리하기 위해 사용될 수도 있다.
CommonErrorHandler
코드상에서는 어떻게 처리하면 될까? try-catch로 만들고 catch에서 DLQ로 토픽을 발행하도록 만들면 괜찮지 않을까?
그렇게 처리해도 괜찮겠지만 만약 Listener 클래스가 10개라 하면 클래스 10개를 모두 수정해줘야한다. 그건 좀 번거롭다..!
spring-kafka 에서는 이걸 쉽게 처리할 수 있게 CommonErrorHandler 인터페이스를 제공한다.
CommonErrorHandler
구현체를 빈으로 등록해주고 컨슈머 설정 클래스에서 CommonErrorHandler
빈을 주입받아 사용하면 된다.
// 컨슈머 설정 클래스
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setCommonErrorHandler(myErrorHandler); // myErrorHandler는 CommonErrorHandler 빈
...
return factory;
}
- skahn1215님의 글에서
CommonErrorHandler
구현체 코드를 참조할 수 있다. CommonErrorHandler
구현체 코드는 아래 참고
The framework provides the DeadLetterPublishingRecoverer, which publishes the failed message to another topic.
CommonErrorHandler
구현체를 만들 때는 DeadLetterPublishingRecoverer
를 정의해줘야 한다.
주의할 점은 별도 설정을 하지 않으면 <기존 토픽명>.DLT
토픽 + 기존 파티션
으로 발행되므로 dead letter topic(이하 DLT)은 기존 토픽의 파티션 수보다 많거나 작아야 한다!
임의의 토픽명과 파티션 번호로도 토픽 발행이 가능하다.
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
(r, e) -> {
if (e instanceof FooException) {
return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
}
else {
return new TopicPartition(r.topic() + ".other.failures", r.partition());
}
});
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));
DLQ는 최후의 보루이니만큼 가능하면 DLQ로 쌓기 전에 정해진 리소스 안에서 최대한 처리를 하는게 맞다.
그래서 retry를 통해 처리를 하되, 적절한 retry 전략을 적용한다면 혼잡한 상황을 피하면서 원활한 처리를 기대할 수 있다.
retry 전략으로 exponentional backoff, jitter를 사용할 수 있다.
그러면 Producer는?
Producer에서 토픽을 발행할 때 네트워크 I/O 이슈가 발생할 수 있다보니 토픽 발행에 실패할 수 있다. 위에서는 일단 토픽 발행은
성공했다고 가정하고 작업 실패 시 DLQ를 사용하는 거라서 끄덕일 수 있었는데, 사실 토픽 발행 자체가 실패하면 바로 유실되는 것이라서
굉장히 곤란해지겠다는 생각이 들었다. 토픽 발행에 실패했는지 알려면 어떻게 해야할까?
ProducerListener
spring kafka에서는 ProducerListener 인터페이스를 제공하고 있다.
ProducerListener
구현체를 빈으로 등록하면 토픽 발행 실패 시 콜백 함수로 onError()
를 호출한다.
public interface ProducerListener<K, V> {
// 토픽 발행 성공 시 호출
void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);
// 토픽 발행 실패 시 호출
void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
Exception exception);
}
- 모든 Producer에 대해 전역적으로 처리하는 방식이라서, 개별로 적용하고 싶다면 CompletableFuture.whenComplete() 콜백 함수를 구현하면 된다.
그렇다면 onError()
를 호출할 때 토픽 발행을 실패한거니까, 이때 retry로 토픽이 발행되지 않는 상황을 줄일 수 있다.
DLQ가 기존 message queue와 다른 서버에서 운영된다면 DLQ에 토픽 발행을 하는 것도 방법인 것 같다. 그치만 둘 다 토픽 발행
실패 상황을 최소화하는 전략일 뿐 완벽히 해소해주지 않는다.
위 방법이 모두 실패했을 때는 로그를 남기고 알림을 개발자에게 날려서 수동으로 대응하는 방법도 있다.
Transactional Outbox Pattern
트랜잭션 안에서 outbox 테이블에 이벤트 메시지를 insert 하고, CDC
를 통해서 토픽 발행을 한다면 문제를 해결할 수 있다.
CDC(Change Data Capture)
는 테이블의 변경을 감지해 행위(토픽 발행)를 발생시키는 기법을 말한다. CDC
는 Debezium이라는 오픈소스를 사용해서 적용할 수 있다.
Debezium
은 (MySQL 기준으로) binlog를 읽어서 순차적으로 처리하고, 트랜잭션이 성공했을 때 outbox 테이블에 갱신이 됐음을
보장할 수 있기때문에 토픽 발행이 누락되는 케이스에 대해서 크게 걱정하지 않아도 된다.
References
- TOSS SLASH 23 : 은행 최초 코어뱅킹 MSA 전환기 (feat. 지금 이자 받기)
- spring-kafka docs : ProducerListener
- spring-kafka docs : CommonErrorHandler, Publishing Dead-letter Records
- skahn1215 : [Kafka] DLQ(Dead Letter Queue)
- nooblette: [시스템 디자인] 실습으로 배우는 선착순 이벤트 시스템 (번외) - DLT(DeadLetterTopic)을 이용한 메시지 Consume 재처리
- 정섭 : Retry 전략에 대해서(Exponential Backoff, Jitter)
- tillog : CDC란?
댓글남기기