1 분 소요

스프링 프로젝트에 Kafka를 적용해서 테스트까지 해보자!

Kafka

기본 개념

  • 브로커 : 데이터를 주고 받기위해 사용하는 주체로, 서버 1개당 1개 브로커가 실행됨
  • 카프카 클러스터 : 여러 브로커 관리
  • 프로듀서 : 특정 토픽으로 메시지 보냄
  • 컨슈머 : 특정 토픽으로 들어오는 메시지 받음
  • 주키퍼 : 카프카 클러스터를 실행하기 위해 필요하며, 여러 카프카 클러스터 운영 가능

docker-compose.yml

스프링에 연결하기 전에 kafka를 docker 컨테이너로 띄워주자.
다음 내용을 docker-compose.yml로 저장하고, docker compose up -d를 입력하면 이미지를 받아서 실행까지 진행한다.
물론 로컬로 따로 설치해도 된다!

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"

  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
    depends_on:
      - zookeeper
  • M1 칩을 사용한다면 zookeeperplatform: linux/amd64를 추가해줘야 함

의존성 추가

Kafka를 사용할 수 있도록 다음처럼 의존성을 추가해주자.

implementation 'org.springframework.kafka:spring-kafka'

application.yml

kafka 설정을 위해 다음 내용도 application.yml에 추가해주자.

spring:
  kafka:
    bootstrap-servers:
      - 127.0.0.1:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    listener:
      type: batch
    consumer:
      group-id: crawl-group-id
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

KafkaProducer

프로듀서는 특정 토픽으로 메시지를 전달하는 역할을 한다.
다음처럼 KafkaTemplate.send()를 통해 토픽으로 메시지를 전달할 수 있다.

@RequiredArgsConstructor
@Component
public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void makeMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

KafkaConsumer

컨슈머는 특정 토픽으로 들어오는 메시지를 받는 역할을 한다.
다음처럼 @KafkaListener로 메시지를 받을 토픽을 설정해줄 수 있고, groupId를 설정해 컨슈머 그룹을 설정해줄 수 있다.

@Slf4j
@Component
public class KafkaConsumer {

    @KafkaListener(topics = "test-topic", groupId = "test-group-id")
    public void consumeMessage(String message) {
        log.info("message: {}", message);
    }
}
  • test-topic 토픽에 있는 메시지를 받아 파라미터 message로 가져옴

MessageController

프로듀서에 구현한 makeMessage를 활용해 다음처럼 설정해주면 테스트까지 해볼 수 있다.
localhost:8080/kafka?msg=hello로 접속하면 아래에서 정의한 call()이 호출되고 프로듀서에서 토픽 test-topic으로
입력받은 메시지 hello를 보낼 것이고 컨슈머는 test-topic에 들어온 메시지 hello를 받아 로그로 출력하게 된다!

@RequiredArgsConstructor
@Controller
public class MessageController {

    private final KafkaProducer kafkaProducer;

    @PostMapping("/kafka")
    public void call(@RequestParam("msg") String message) {
        kafkaProducer.makeMessage("test-topic", message);
    }
}

References

카테고리:

업데이트:

댓글남기기