[Spring] Kafka 세부 설정하기
스프링 프로젝트에서 kafka를 적용할 때 간단한 설정은 application.yml
에서 해줄 수 있었다.
하지만 Consumer에서 데이터를 꺼내오는 주기 등의 세부 설정은 application.yml
에서 할 수 없어서 설정 클래스를 등록했다.
Consumer
설정 클래스 정의
다음은 Consumer 세부 설정을 할 수 있는 클래스 KafkaConsumerConfig
코드다.
application.yml
에서 일부 설정까지는 해줄 수 있었지만 MAX_POLL_INTERVAL_MS_CONFIG
등의 세부 설정은 클래스를
통해서 등록해야 한다.
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@ConditionalOnMissingBean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group-id");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 15000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean(name="kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
ENABLE_AUTO_COMMIT_CONFIG
: 메시지 offset의 commit 간격 자동 처리 여부MAX_POLL_RECORDS_CONFIG
: 가져올 메시지 최대 개수MAX_POLL_INTERVAL_MS_CONFIG
: Consumer 메시지 처리 주기HEARTBEAT_INTERVAL_MS_CONFIG
: Consumer가 살아있는지 알리는 신호를 브로커에 보내는 주기SESSION_TIMEOUT_MS_CONFIG
: Consumer 그룹의 멤버가 브로커와 연결을 유지해야 하는 시간AckMode
→MANUAL_IMMEDIATE
: Acknowledging이 붙은 리스너 사용
Acknowledgment 설정
Consumer는 메시지 처리 후 acknowledgment.acknowledge()
를 호출해 해당 메시지의 offset을 커밋한다.
매번 Consumer는 커밋된 offset 이후의 메시지부터 처리하게 되므로 중복 처리를 방지하고 메시지 처리의 신뢰성을 유지할 수 있다.
@KafkaListener(topics = "test-topic", groupId = "test-group-id")
public void consumeCrawledProductsV4(String productJson, Acknowledgment acknowledgment) {
updateCrawledProductsProduct(productJson);
acknowledgment.acknowledge(); //메시지 offset 커밋
...
}
Producer
설정 클래스 정의
Consumer와 같은 이유로 세부 설정을 위해 설정 클래스 KafkaTemplateConfig
를 정의한다.
@Configuration
public class KafkaTemplateConfig {
private static final String BOOTSTRAP_SERVER = "localhost:9092";
@Bean
public KafkaTemplate<String, String> customKafkaTemplate() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "3000");
//KafkaTemplate 객체 생성 시 프로듀서 옵션을 직접 넣음
ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props);
return new KafkaTemplate<>(producerFactory);
}
}
ACKS_CONFIG
→all
: 보낸 메시지를 leader, follower 모두 받았는지 확인 (손실↓)REQUEST_TIMEOUT_MS_CONFIG
: 요청 후 응답이 도착할 때까지 기다리는 시간 (ms 기준)
References
- Kafka 공식 문서 : CONFIGURATION
- AndersonChoi: Kafka consumer의 Automatic Commit은 중복이 생길 수 있다
- 제제킴 : [Kafka] 같은메시지를 반복적으로 소비했던 리밸런싱 이슈 해결 (MAX POLL RECORDS CONFIG = “max.poll.record
- hanseom: 07. 스프링 카프카 프로듀서(Spring kafka Producer)
- hanseom: 08. 스프링 카프카 컨슈머(Spring Kafka Consumer)
- 고승범: Kafka 운영자가 말하는 Producer ACKS
- jelly: [Kafka + Spring Boot Tip] Duplicate Topic Receive & Already rebalanced and assigned Error Solution
댓글남기기