-
[Spring Boot] RabbitMQ 연동하기Spring Boot/기타 2023. 6. 4. 18:27반응형
RabbitMQ는 메시지를 생산하는 생산자(Producer)가 메시지를 큐(Queue)에 저장해 두면, 메시지를 수신하는 소비자(Consumer)가 메시지를 가져와 처리하는 Publish/Subscribe 방식의 메시지 전달 브로커입니다.
RabbitMQ : https://www.rabbitmq.com
1. Docker를 사용하여 RabbitMQ 설치
Docker를 사용하여 RabbitMQ를 로컬에 설치하세요.
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 --restart=unless-stopped rabbitmq:management
포트 5672는 RabbitMQ 클라이언트 연결에 사용되고, 포트 15672는 RabbitMQ 웹 관리 콘솔에 사용됩니다.
- 웹 관리 콘솔 : http://localhost:15672
- 기본 계정 및 비밀번호 : guest/guest
RabbitMQ 웹 관리 콘솔
2. 의존성 추가
Spring AMQP 및 Rabbit MQ 사용을 위한 'org.springframework.boot:spring-boot-starter-amqp'를 추가하세요.
Spring AMQP : https://spring.io/projects/spring-amqp
build.gradle
plugins { id 'java' id 'org.springframework.boot' version '3.1.0' id 'io.spring.dependency-management' version '1.1.0' } group = 'com.example' version = '0.0.1-SNAPSHOT' sourceCompatibility = '17' configurations { compileOnly { extendsFrom annotationProcessor } } repositories { mavenCentral() } dependencies { // Spring Boot implementation 'org.springframework.boot:spring-boot-starter-web' testImplementation 'org.springframework.boot:spring-boot-starter-test' // RabbitMQ implementation 'org.springframework.boot:spring-boot-starter-amqp' compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' testCompileOnly 'org.projectlombok:lombok' testAnnotationProcessor 'org.projectlombok:lombok' } tasks.named('test') { useJUnitPlatform() }
3. RabbitMQ 설정RabbitMQ와의 연결을 설정하고, 메시지 전송을 위한 Queue, DirectExchange, Binding 등의 구성 요소를 추가하세요.
RabbitMQConfig.javapackage com.example.rabbitmqsample.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { @Value("${spring.rabbitmq.host}") private String rabbitmqHost; @Value("${spring.rabbitmq.port}") private int rabbitmqPort; @Value("${spring.rabbitmq.username}") private String rabbitmqUsername; @Value("${spring.rabbitmq.password}") private String rabbitmqPassword; @Value("${rabbitmq.queue.name}") private String queueName; @Value("${rabbitmq.exchange.name}") private String exchangeName; @Value("${rabbitmq.routing.key}") private String routingKey; /** * 지정된 큐 이름으로 Queue 빈을 생성 * * @return Queue 빈 객체 */ @Bean public Queue queue() { return new Queue(queueName); } /** * 지정된 익스체인지 이름으로 DirectExchange 빈을 생성 * * @return TopicExchange 빈 객체 */ @Bean public DirectExchange exchange() { return new DirectExchange(exchangeName); } /** * 주어진 큐와 익스체인지를 바인딩하고 라우팅 키를 사용하여 Binding 빈을 생성 * * @param queue 바인딩할 Queue * @param exchange 바인딩할 TopicExchange * @return Binding 빈 객체 */ @Bean public Binding binding(Queue queue, DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(routingKey); } /** * RabbitMQ 연결을 위한 ConnectionFactory 빈을 생성하여 반환 * * @return ConnectionFactory 객체 */ @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(rabbitmqHost); connectionFactory.setPort(rabbitmqPort); connectionFactory.setUsername(rabbitmqUsername); connectionFactory.setPassword(rabbitmqPassword); return connectionFactory; } /** * RabbitTemplate을 생성하여 반환 * * @param connectionFactory RabbitMQ와의 연결을 위한 ConnectionFactory 객체 * @return RabbitTemplate 객체 */ @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // JSON 형식의 메시지를 직렬화하고 역직렬할 수 있도록 설정 rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter()); return rabbitTemplate; } /** * Jackson 라이브러리를 사용하여 메시지를 JSON 형식으로 변환하는 MessageConverter 빈을 생성 * * @return MessageConverter 객체 */ @Bean public MessageConverter jackson2JsonMessageConverter() { return new Jackson2JsonMessageConverter(); } }
application.ymlRabbitMQ 연결 정보 및 Queue, Exchange, Routing-Key 정보를 추가하세요.
# Service Post 설정 server: port: 9091 # Log Level 설정 logging: level: root: info # RabbitMQ 연결 정보 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest # RabbitMQ queue, exchange, routing-key 정보 rabbitmq: queue.name: sample.queue exchange.name: sample.exchange routing.key: sample.key
4. RabbitMQ Service 추가
Queue로 메시지를 발행할 때는 RabbitTemplate 클래스에 convertAndSend 메서드를 사용하고, Queue에서 메시지를 구독할 때는 하는 @RabbitListener 어노테이션을 사용한 메서드를 추가하세요.
MessageService.java
package com.example.rabbitmqsample.service; import com.example.rabbitmqsample.dto.MessageDto; import lombok.AllArgsConstructor; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; @Slf4j @RequiredArgsConstructor @Service public class MessageService { @Value("${rabbitmq.exchange.name}") private String exchangeName; @Value("${rabbitmq.routing.key}") private String routingKey; private final RabbitTemplate rabbitTemplate; /** * Queue로 메시지를 발행 * * @param messageDto 발행할 메시지의 DTO 객체 */ public void sendMessage(MessageDto messageDto) { log.info("message sent: {}", messageDto.toString()); rabbitTemplate.convertAndSend(exchangeName, routingKey, messageDto); } /** * Queue에서 메시지를 구독 * * @param messageDto 구독한 메시지를 담고 있는 MessageDto 객체 */ @RabbitListener(queues = "${rabbitmq.queue.name}") public void reciveMessage(MessageDto messageDto) { log.info("Received message: {}", messageDto.toString()); } }
MessageDto.java
메시지를 보내기 위한 DTO 클래스를 추가하세요.
package com.example.rabbitmqsample.dto; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; import lombok.ToString; @Getter @Setter @ToString @AllArgsConstructor @NoArgsConstructor public class MessageDto { private String title; private String content; }
5. RabbitMQ Controller 추가
클라이언트로부터 메시지를 받아 해당 메시지를 발행할 수 있도록 간단한 REST API 추가하세요.
MessageController.java
package com.example.rabbitmqsample.contoller; import com.example.rabbitmqsample.dto.MessageDto; import com.example.rabbitmqsample.service.MessageService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; @Slf4j @RequiredArgsConstructor @RestController public class MessageController { private final MessageService messageService; /** * Queue로 메시지를 발행 * * @param messageDto 발행할 메시지의 DTO 객체 * @return ResponseEntity 객체로 응답을 반환 */ @RequestMapping(value = "/send/message", method = RequestMethod.POST) public ResponseEntity<?> sendMessage(@RequestBody MessageDto messageDto) { messageService.sendMessage(messageDto); return ResponseEntity.ok("Message sent to RabbitMQ!"); } }
6. 메시지 발행 및 구독 확인message.http
### 메시지 발행 POST http://localhost:9091/send/message Content-Type: application/json { "title" : "Message Title Test", "content" : "Message Content Test" }
콘솔 로그 확인
소스 코드는 Github Repository - https://github.com/tychejin1218/message-queue.git 에서 rabbitmq-sample 프로젝트를 참조하세요.
반응형'Spring Boot > 기타' 카테고리의 다른 글
[Spring Boot] ActiveMQ + STOMP 연동 (0) 2023.06.17 [Spring Boot] ActiveMQ 연동하기 (0) 2023.06.11 [Spring Boot] Amazon S3로 파일 업로드 및 삭제 (0) 2023.01.27 [Spring boot] Database가 Replication일 때 DataSource 설정 (2) 2023.01.17 [Spring Boot] Spring Data JPA + QueryDSL 설정 (2) 2023.01.15