3 분 소요

Kafka를 적용한 서비스를 운영중이라면 토픽 발행이 잘 되는지, 발행은 됐지만 컨슈머에서 처리가 안됐을 때는 어떻게 해야할지에 대해 고민해볼 필요가 있다. 유실돼도 괜찮은 거라면 중요도가 떨어지겠지만, 반드시 처리돼야 하는 케이스도 있기 때문이다.

TOSS SLASH 23 - 은행 최초 코어뱅킹 MSA 전환기 (feat. 지금 이자 받기)성능 개선을 위한 비동기 처리 에서 Kafka 메시지 큐 구성을 그림으로 보았을 때 데이터 정합성을 보장하기 위해 Dead Letter Queue(이하 DLQ)를 썼다는 점을 간단하게는 알 수 있었지만, 컨슈머에서 처리가 되지 못했을 때 retry를 적용했다는 점 외에는 자세히 써있지 않았다. 구체적으로 처리할 수 있는 방안이 뭐가 있을지 궁금해서 찾아보게 되었다.

DLQ(Dead Letter Queue)

?


Kafka, RabbitMQ 등의 기술은 message queue를 사용한다. message queue에 메시지를 쌓아두면 pub-sub으로 처리를 하든, 컨슈머가 message queue에 있는 메시지를 가져가든, 아무튼 컨슈머에서 처리를 하게 된다. 그 과정에서 정상적으로 처리가 되지 못했을 때 재처리 용도로 DLQ를 사용할 수 있다. 최후의 보루로 두는 queue라고 생각하면 된다.

컨슈머 상태가 좋지 않아서 또는 로직 내부에서 외부 서버와 통신하는데 정상 응답을 받지 못해서 정상적으로 작업을 완료할 수 없을 때 사용하는게 일반적이다. 추후에는 언급한 문제들이 해소된 상황에서 처리될 수 있을거라는 기대가 있기 때문이다. 메시지에 있는 값이 컨슈머에서 처리할 수 없는 값일 때 개선 후 처리하기 위해 사용될 수도 있다.

CommonErrorHandler

코드상에서는 어떻게 처리하면 될까? try-catch로 만들고 catch에서 DLQ로 토픽을 발행하도록 만들면 괜찮지 않을까?
그렇게 처리해도 괜찮겠지만 만약 Listener 클래스가 10개라 하면 클래스 10개를 모두 수정해줘야한다. 그건 좀 번거롭다..!
spring-kafka 에서는 이걸 쉽게 처리할 수 있게 CommonErrorHandler 인터페이스를 제공한다.
CommonErrorHandler 구현체를 빈으로 등록해주고 컨슈머 설정 클래스에서 CommonErrorHandler 빈을 주입받아 사용하면 된다.

// 컨슈머 설정 클래스
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
        kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.setCommonErrorHandler(myErrorHandler); // myErrorHandler는 CommonErrorHandler 빈
    ...
    return factory;
}
  • skahn1215님의 글에서 CommonErrorHandler 구현체 코드를 참조할 수 있다.
  • CommonErrorHandler 구현체 코드는 아래 참고

The framework provides the DeadLetterPublishingRecoverer, which publishes the failed message to another topic.

CommonErrorHandler 구현체를 만들 때는 DeadLetterPublishingRecoverer를 정의해줘야 한다.
주의할 점은 별도 설정을 하지 않으면 <기존 토픽명>.DLT 토픽 + 기존 파티션으로 발행되므로 dead letter topic(이하 DLT)은 기존 토픽의 파티션 수보다 많거나 작아야 한다! 임의의 토픽명과 파티션 번호로도 토픽 발행이 가능하다.

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
    (r, e) -> {
        if (e instanceof FooException) {
            return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
        }
        else {
            return new TopicPartition(r.topic() + ".other.failures", r.partition());
        }
    });
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));

DLQ는 최후의 보루이니만큼 가능하면 DLQ로 쌓기 전에 정해진 리소스 안에서 최대한 처리를 하는게 맞다.
그래서 retry를 통해 처리를 하되, 적절한 retry 전략을 적용한다면 혼잡한 상황을 피하면서 원활한 처리를 기대할 수 있다.
retry 전략으로 exponentional backoff, jitter를 사용할 수 있다.

그러면 Producer는?

Producer에서 토픽을 발행할 때 네트워크 I/O 이슈가 발생할 수 있다보니 토픽 발행에 실패할 수 있다. 위에서는 일단 토픽 발행은 성공했다고 가정하고 작업 실패 시 DLQ를 사용하는 거라서 끄덕일 수 있었는데, 사실 토픽 발행 자체가 실패하면 바로 유실되는 것이라서 굉장히 곤란해지겠다는 생각이 들었다. 토픽 발행에 실패했는지 알려면 어떻게 해야할까?

ProducerListener

spring kafka에서는 ProducerListener 인터페이스를 제공하고 있다.
ProducerListener 구현체를 빈으로 등록하면 토픽 발행 실패 시 콜백 함수로 onError()를 호출한다.

public interface ProducerListener<K, V> {
  // 토픽 발행 성공 시 호출
  void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);
  // 토픽 발행 실패 시 호출
  void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
          Exception exception);
}
  • 모든 Producer에 대해 전역적으로 처리하는 방식이라서, 개별로 적용하고 싶다면 CompletableFuture.whenComplete() 콜백 함수를 구현하면 된다.

그렇다면 onError()를 호출할 때 토픽 발행을 실패한거니까, 이때 retry로 토픽이 발행되지 않는 상황을 줄일 수 있다.
DLQ가 기존 message queue와 다른 서버에서 운영된다면 DLQ에 토픽 발행을 하는 것도 방법인 것 같다. 그치만 둘 다 토픽 발행 실패 상황을 최소화하는 전략일 뿐 완벽히 해소해주지 않는다.

위 방법이 모두 실패했을 때는 로그를 남기고 알림을 개발자에게 날려서 수동으로 대응하는 방법도 있다.

Transactional Outbox Pattern

트랜잭션 안에서 outbox 테이블에 이벤트 메시지를 insert 하고, CDC를 통해서 토픽 발행을 한다면 문제를 해결할 수 있다.
CDC(Change Data Capture)는 테이블의 변경을 감지해 행위(토픽 발행)를 발생시키는 기법을 말한다. CDCDebezium이라는 오픈소스를 사용해서 적용할 수 있다.

Debezium은 (MySQL 기준으로) binlog를 읽어서 순차적으로 처리하고, 트랜잭션이 성공했을 때 outbox 테이블에 갱신이 됐음을 보장할 수 있기때문에 토픽 발행이 누락되는 케이스에 대해서 크게 걱정하지 않아도 된다.

References

카테고리:

업데이트:

댓글남기기