ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Spring Boot] RabbitMQ 연동하기
    Spring Boot/기타 2023. 6. 4. 18:27
    반응형

    RabbitMQ는 메시지를 생산하는 생산자(Producer)가 메시지를 큐(Queue)에 저장해 두면, 메시지를 수신하는 소비자(Consumer)가 메시지를 가져와 처리하는 Publish/Subscribe 방식의 메시지 전달 브로커입니다.

    RabbitMQ : https://www.rabbitmq.com

     

    1. Docker를 사용하여 RabbitMQ 설치

    Docker를 사용하여 RabbitMQ를 로컬에 설치하세요.

    docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 --restart=unless-stopped rabbitmq:management

    포트 5672는 RabbitMQ 클라이언트 연결에 사용되고, 포트 15672는 RabbitMQ 웹 관리 콘솔에 사용됩니다.

    • 웹 관리 콘솔 : http://localhost:15672 
    • 기본 계정 및 비밀번호 : guest/guest

     

    RabbitMQ 웹 관리 콘솔

     

    2. 의존성 추가

    Spring AMQP 및 Rabbit MQ 사용을 위한 'org.springframework.boot:spring-boot-starter-amqp'를 추가하세요.

    Spring AMQP : https://spring.io/projects/spring-amqp

    build.gradle

    plugins {
        id 'java'
        id 'org.springframework.boot' version '3.1.0'
        id 'io.spring.dependency-management' version '1.1.0'
    }
    
    group = 'com.example'
    version = '0.0.1-SNAPSHOT'
    sourceCompatibility = '17'
    
    configurations {
        compileOnly {
            extendsFrom annotationProcessor
        }
    }
    
    repositories {
        mavenCentral()
    }
    
    dependencies {
        // Spring Boot
        implementation 'org.springframework.boot:spring-boot-starter-web'
        testImplementation 'org.springframework.boot:spring-boot-starter-test'
    
        // RabbitMQ
        implementation 'org.springframework.boot:spring-boot-starter-amqp'
    
        compileOnly 'org.projectlombok:lombok'
        annotationProcessor 'org.projectlombok:lombok'
        testCompileOnly 'org.projectlombok:lombok'
        testAnnotationProcessor 'org.projectlombok:lombok'
    }
    
    tasks.named('test') {
        useJUnitPlatform()
    }


    3. RabbitMQ 설정

    RabbitMQ와의 연결을 설정하고, 메시지 전송을 위한 Queue, DirectExchange, Binding 등의 구성 요소를 추가하세요.
    RabbitMQConfig.java

    package com.example.rabbitmqsample.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitMQConfig {
    
      @Value("${spring.rabbitmq.host}")
      private String rabbitmqHost;
    
      @Value("${spring.rabbitmq.port}")
      private int rabbitmqPort;
    
      @Value("${spring.rabbitmq.username}")
      private String rabbitmqUsername;
    
      @Value("${spring.rabbitmq.password}")
      private String rabbitmqPassword;
    
      @Value("${rabbitmq.queue.name}")
      private String queueName;
    
      @Value("${rabbitmq.exchange.name}")
      private String exchangeName;
    
      @Value("${rabbitmq.routing.key}")
      private String routingKey;
    
      /**
       * 지정된 큐 이름으로 Queue 빈을 생성
       *
       * @return Queue 빈 객체
       */
      @Bean
      public Queue queue() {
        return new Queue(queueName);
      }
    
      /**
       * 지정된 익스체인지 이름으로 DirectExchange 빈을 생성
       *
       * @return TopicExchange 빈 객체
       */
      @Bean
      public DirectExchange exchange() {
        return new DirectExchange(exchangeName);
      }
    
      /**
       * 주어진 큐와 익스체인지를 바인딩하고 라우팅 키를 사용하여 Binding 빈을 생성
       *
       * @param queue    바인딩할 Queue
       * @param exchange 바인딩할 TopicExchange
       * @return Binding 빈 객체
       */
      @Bean
      public Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(routingKey);
      }
    
      /**
       * RabbitMQ 연결을 위한 ConnectionFactory 빈을 생성하여 반환
       *
       * @return ConnectionFactory 객체
       */
      @Bean
      public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(rabbitmqHost);
        connectionFactory.setPort(rabbitmqPort);
        connectionFactory.setUsername(rabbitmqUsername);
        connectionFactory.setPassword(rabbitmqPassword);
        return connectionFactory;
      }
    
      /**
       * RabbitTemplate을 생성하여 반환
       *
       * @param connectionFactory RabbitMQ와의 연결을 위한 ConnectionFactory 객체
       * @return RabbitTemplate 객체
       */
      @Bean
      public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // JSON 형식의 메시지를 직렬화하고 역직렬할 수 있도록 설정
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
        return rabbitTemplate;
      }
    
      /**
       * Jackson 라이브러리를 사용하여 메시지를 JSON 형식으로 변환하는 MessageConverter 빈을 생성
       *
       * @return MessageConverter 객체
       */
      @Bean
      public MessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
      }
    }


    application.yml

    RabbitMQ 연결 정보 및 Queue, Exchange, Routing-Key 정보를 추가하세요.

    # Service Post 설정
    server:
      port: 9091
    
    # Log Level 설정
    logging:
      level:
        root: info
    
    # RabbitMQ 연결 정보
    spring:
      rabbitmq:
        host: localhost
        port: 5672
        username: guest
        password: guest
    
    # RabbitMQ queue, exchange, routing-key 정보
    rabbitmq:
      queue.name: sample.queue
      exchange.name: sample.exchange
      routing.key: sample.key

     

    4. RabbitMQ Service 추가

    Queue로 메시지를 발행할 때는 RabbitTemplate 클래스에 convertAndSend 메서드를 사용하고, Queue에서 메시지를 구독할 때는 하는 @RabbitListener 어노테이션을 사용한 메서드를 추가하세요.

    MessageService.java

    package com.example.rabbitmqsample.service;
    
    import com.example.rabbitmqsample.dto.MessageDto;
    import lombok.AllArgsConstructor;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Service;
    
    @Slf4j
    @RequiredArgsConstructor
    @Service
    public class MessageService {
    
      @Value("${rabbitmq.exchange.name}")
      private String exchangeName;
    
      @Value("${rabbitmq.routing.key}")
      private String routingKey;
    
      private final RabbitTemplate rabbitTemplate;
    
      /**
       * Queue로 메시지를 발행
       *
       * @param messageDto 발행할 메시지의 DTO 객체
       */
      public void sendMessage(MessageDto messageDto) {
        log.info("message sent: {}", messageDto.toString());
        rabbitTemplate.convertAndSend(exchangeName, routingKey, messageDto);
      }
    
      /**
       * Queue에서 메시지를 구독
       *
       * @param messageDto 구독한 메시지를 담고 있는 MessageDto 객체
       */
      @RabbitListener(queues = "${rabbitmq.queue.name}")
      public void reciveMessage(MessageDto messageDto) {
        log.info("Received message: {}", messageDto.toString());
      }
    }

     

    MessageDto.java

    메시지를 보내기 위한 DTO 클래스를 추가하세요.

    package com.example.rabbitmqsample.dto;
    
    import lombok.AllArgsConstructor;
    import lombok.Getter;
    import lombok.NoArgsConstructor;
    import lombok.Setter;
    import lombok.ToString;
    
    @Getter
    @Setter
    @ToString
    @AllArgsConstructor
    @NoArgsConstructor
    public class MessageDto {
    
      private String title;
      private String content;
    }

     

    5. RabbitMQ Controller 추가

    클라이언트로부터 메시지를 받아 해당 메시지를 발행할 수 있도록 간단한 REST API 추가하세요.

    MessageController.java

    package com.example.rabbitmqsample.contoller;
    
    import com.example.rabbitmqsample.dto.MessageDto;
    import com.example.rabbitmqsample.service.MessageService;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.http.ResponseEntity;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.bind.annotation.RestController;
    
    @Slf4j
    @RequiredArgsConstructor
    @RestController
    public class MessageController {
    
      private final MessageService messageService;
    
      /**
       * Queue로 메시지를 발행
       *
       * @param messageDto 발행할 메시지의 DTO 객체
       * @return ResponseEntity 객체로 응답을 반환
       */
      @RequestMapping(value = "/send/message", method = RequestMethod.POST)
      public ResponseEntity<?> sendMessage(@RequestBody MessageDto messageDto) {
        messageService.sendMessage(messageDto);
        return ResponseEntity.ok("Message sent to RabbitMQ!");
      }
    }



    6. 메시지 발행 및 구독 확인

    message.http

    ### 메시지 발행
    POST http://localhost:9091/send/message
    Content-Type: application/json
    
    {
        "title" : "Message Title Test",
        "content" : "Message Content Test"
    }

    콘솔 로그 확인

     

    소스 코드는 Github Repository - https://github.com/tychejin1218/message-queue.git 에서 rabbitmq-sample 프로젝트를 참조하세요.

    반응형

    댓글

Designed by Tistory.