1 분 소요

스프링 프로젝트에서 kafka를 적용할 때 간단한 설정은 application.yml에서 해줄 수 있었다.
하지만 Consumer에서 데이터를 꺼내오는 주기 등의 세부 설정은 application.yml에서 할 수 없어서 설정 클래스를 등록했다.

Consumer

설정 클래스 정의

다음은 Consumer 세부 설정을 할 수 있는 클래스 KafkaConsumerConfig 코드다.
application.yml에서 일부 설정까지는 해줄 수 있었지만 MAX_POLL_INTERVAL_MS_CONFIG 등의 세부 설정은 클래스를
통해서 등록해야 한다.

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @ConditionalOnMissingBean
    public ConsumerFactory<String, String> consumerFactory() {

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group-id");

        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 15000);
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);

        return new DefaultKafkaConsumerFactory<>(props);

    }

    @Bean(name="kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
}
  • ENABLE_AUTO_COMMIT_CONFIG : 메시지 offset의 commit 간격 자동 처리 여부
  • MAX_POLL_RECORDS_CONFIG : 가져올 메시지 최대 개수
  • MAX_POLL_INTERVAL_MS_CONFIG : Consumer 메시지 처리 주기
  • HEARTBEAT_INTERVAL_MS_CONFIG : Consumer가 살아있는지 알리는 신호를 브로커에 보내는 주기
  • SESSION_TIMEOUT_MS_CONFIG : Consumer 그룹의 멤버가 브로커와 연결을 유지해야 하는 시간
  • AckModeMANUAL_IMMEDIATE : Acknowledging이 붙은 리스너 사용

Acknowledgment 설정

Consumer는 메시지 처리 후 acknowledgment.acknowledge()를 호출해 해당 메시지의 offset을 커밋한다. 매번 Consumer는 커밋된 offset 이후의 메시지부터 처리하게 되므로 중복 처리를 방지하고 메시지 처리의 신뢰성을 유지할 수 있다.

@KafkaListener(topics = "test-topic", groupId = "test-group-id")
public void consumeCrawledProductsV4(String productJson, Acknowledgment acknowledgment) {
    updateCrawledProductsProduct(productJson);
    acknowledgment.acknowledge(); //메시지 offset 커밋
    ...
}

Producer

설정 클래스 정의

Consumer와 같은 이유로 세부 설정을 위해 설정 클래스 KafkaTemplateConfig를 정의한다.

@Configuration
public class KafkaTemplateConfig {

    private static final String BOOTSTRAP_SERVER = "localhost:9092";

    @Bean
    public KafkaTemplate<String, String> customKafkaTemplate() {

        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "3000");

        //KafkaTemplate 객체 생성 시 프로듀서 옵션을 직접 넣음
        ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props);
        return new KafkaTemplate<>(producerFactory);
    }
}
  • ACKS_CONFIGall : 보낸 메시지를 leader, follower 모두 받았는지 확인 (손실↓)
  • REQUEST_TIMEOUT_MS_CONFIG : 요청 후 응답이 도착할 때까지 기다리는 시간 (ms 기준)

References

카테고리:

업데이트:

댓글남기기