[Spring] Kafka 적용하기
스프링 프로젝트에 Kafka를 적용해서 테스트까지 해보자!
Kafka
기본 개념
- 브로커 : 데이터를 주고 받기위해 사용하는 주체로, 서버 1개당 1개 브로커가 실행됨
- 카프카 클러스터 : 여러 브로커 관리
- 프로듀서 : 특정 토픽으로 메시지 보냄
- 컨슈머 : 특정 토픽으로 들어오는 메시지 받음
- 주키퍼 : 카프카 클러스터를 실행하기 위해 필요하며, 여러 카프카 클러스터 운영 가능
docker-compose.yml
스프링에 연결하기 전에 kafka를 docker 컨테이너로 띄워주자.
다음 내용을 docker-compose.yml
로 저장하고, docker compose up -d
를 입력하면 이미지를 받아서 실행까지 진행한다.
물론 로컬로 따로 설치해도 된다!
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
depends_on:
- zookeeper
- M1 칩을 사용한다면
zookeeper
에platform: linux/amd64
를 추가해줘야 함
의존성 추가
Kafka를 사용할 수 있도록 다음처럼 의존성을 추가해주자.
implementation 'org.springframework.kafka:spring-kafka'
application.yml
kafka 설정을 위해 다음 내용도 application.yml
에 추가해주자.
spring:
kafka:
bootstrap-servers:
- 127.0.0.1:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
listener:
type: batch
consumer:
group-id: crawl-group-id
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
KafkaProducer
프로듀서는 특정 토픽으로 메시지를 전달하는 역할을 한다.
다음처럼 KafkaTemplate.send()
를 통해 토픽으로 메시지를 전달할 수 있다.
@RequiredArgsConstructor
@Component
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public void makeMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
KafkaConsumer
컨슈머는 특정 토픽으로 들어오는 메시지를 받는 역할을 한다.
다음처럼 @KafkaListener
로 메시지를 받을 토픽을 설정해줄 수 있고, groupId
를 설정해 컨슈머 그룹을 설정해줄 수 있다.
@Slf4j
@Component
public class KafkaConsumer {
@KafkaListener(topics = "test-topic", groupId = "test-group-id")
public void consumeMessage(String message) {
log.info("message: {}", message);
}
}
test-topic
토픽에 있는 메시지를 받아 파라미터message
로 가져옴
MessageController
프로듀서에 구현한 makeMessage
를 활용해 다음처럼 설정해주면 테스트까지 해볼 수 있다.
localhost:8080/kafka?msg=hello
로 접속하면 아래에서 정의한 call()
이 호출되고 프로듀서에서 토픽 test-topic
으로
입력받은 메시지 hello
를 보낼 것이고 컨슈머는 test-topic
에 들어온 메시지 hello
를 받아 로그로 출력하게 된다!
@RequiredArgsConstructor
@Controller
public class MessageController {
private final KafkaProducer kafkaProducer;
@PostMapping("/kafka")
public void call(@RequestParam("msg") String message) {
kafkaProducer.makeMessage("test-topic", message);
}
}
댓글남기기