-
[Spring Boot] ActiveMQ Queue, Topic 연동Spring Boot/기타 2023. 7. 2. 18:14반응형
ActiveMQ란?
- Apache Software Foundation에서 개발된 오픈 소스 메시지 브로커(Message Broker)로 메시징 시스템에서 메시지를 중개하고 전달하는 역할을 담당합니다.
- Java로 작성되었으며, Java Message Service (JMS)를 지원하여 다양한 애플리케이션 간에 비동기적인 통신을 가능하게 합니다.
- 큐(Queue)와 토픽(Topic)이라는 두 가지 메시지 전달 방식을 지원하며, 큐(Queue) 방식은 메시지를 수신 대기 중인 클라이언트 중 하나에게 전달하고, 토픽(Topic) 방식은 메시지를 여러 구독자에게 전달하는 방식입니다.
ActiveMQ : https://activemq.apache.orghttps://activemq.apache.org
1. Docker를 사용하여 ActiveMQ 설치
Docker를 사용하여 ActiveMQ를 로컬에 설치하세요.
docker run -p 61616:61616 -p 8161:8161 rmohr/activemq
Docker Hub : https://hub.docker.com/r/rmohr/activemq
포트 61616는 ActiveMQ 브로커(Broker) 연결, 포트 8161는ActiveMQ 웹 관리 콘솔에 사용됩니다.
- 웹 관리 콘솔 : http://localhost:8161
- 기본 계정 및 비밀번호 : admin/admin
ActiveMQ 웹 관리 콘솔
2. 의존성 추가Apache ActiveMQ를 사용하기 위해 'Spring Boot Starter ActiveMQ'를 추가하세요.
Maven Repository : https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-activemq
build.gradle
plugins { id 'java' id 'org.springframework.boot' version '3.1.0' id 'io.spring.dependency-management' version '1.1.0' } group = 'com.example' version = '0.0.1-SNAPSHOT' sourceCompatibility = '17' configurations { compileOnly { extendsFrom annotationProcessor } } repositories { mavenCentral() } dependencies { // Spring Boot implementation 'org.springframework.boot:spring-boot-starter-web' testImplementation 'org.springframework.boot:spring-boot-starter-test' // ActiveMQ implementation 'org.springframework.boot:spring-boot-starter-activemq' compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' testCompileOnly 'org.projectlombok:lombok' testAnnotationProcessor 'org.projectlombok:lombok' } tasks.named('test') { useJUnitPlatform() }
3. ActiveMQ 설정- JMS(Java Message Service) 클라이언트와 ActiveMQ 메시지 브로커(Message Broker) 간의 연결을 위해 ActiveMQConnectionFactory을 설정하세요.
- 큐(Queue)와 토픽(Topic)에 메시지를 발행하고 수신하기 위해 JmsTemplate을 설정하고, 메시지를 구독하기 위해 JmsListenerContainerFactory를 설정하세요.
- JMS(Java Message Service) 메시지와 JSON 형식의 메시지 간의 변환을 위해 MessageConvert를 설정하세요.
ActiveMQConfig.java
- JmsTemplate의 setPubSubDomain(boolean pubSubDomain) 메서드는 JmsTemplate의 메시지 전송 방식을 결정합니다.
- pubSubDomain 매개변수를 true로 설정하면, JmsTemplate은 Pub-Sub(Domain) 방식으로 동작합니다. Pub-Sub 방식은 Topic을 사용하여 메시지를 발행하고 구독하는 방식을 의미합니다.
- pubSubDomain 매개변수를 false로 설정하면, JmsTemplate은 Point-to-Point(P2P) 방식으로 동작합니다. Point-to-Point 방식은 Queue를 사용하여 메시지를 발행하고 수신하는 방식을 의미합니다.
package com.example.activemq.config; import com.example.activemq.dto.MessageDto; import java.util.HashMap; import java.util.Map; import org.apache.activemq.ActiveMQConnectionFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.support.converter.MappingJackson2MessageConverter; import org.springframework.jms.support.converter.MessageConverter; import org.springframework.jms.support.converter.MessageType; @Configuration public class ActiveMQConfig { @Value("${spring.activemq.broker-url}") private String activemqBrokerUrl; @Value("${spring.activemq.user}") private String activemqUsername; @Value("${spring.activemq.password}") private String activemqPassword; /** * ActiveMQ 연결을 위한 ActiveMQConnectionFactory 빈을 생성하여 반환 * * @return ActiveMQConnectionFactory 객체 */ @Bean public ActiveMQConnectionFactory activeMQConnectionFactory() { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(); activeMQConnectionFactory.setBrokerURL(activemqBrokerUrl); activeMQConnectionFactory.setUserName(activemqUsername); activeMQConnectionFactory.setPassword(activemqPassword); return activeMQConnectionFactory; } /** * Queue JmsTemplate을 생성하여 반환 * * @return JmsTemplateQueue 객체 */ @Bean public JmsTemplate jmsTemplateQueue() { JmsTemplate jmsTemplate = new JmsTemplate(activeMQConnectionFactory()); jmsTemplate.setMessageConverter(jacksonJmsMessageConverter()); jmsTemplate.setExplicitQosEnabled(true); // 메시지 전송 시 QOS을 설정 jmsTemplate.setDeliveryPersistent(false); // 메시지의 영속성을 설정 jmsTemplate.setReceiveTimeout(1000 * 3); // 메시지를 수신하는 동안의 대기 시간을 설정(3초) jmsTemplate.setTimeToLive(1000 * 60 * 30); // 메시지의 유효 기간을 설정(30분) return jmsTemplate; } /** * Queue JmsListenerContainerFactory을 위한 빈을 생성 * * @return JmsTemplate */ @Bean public JmsListenerContainerFactory<?> containerFactoryQueue() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(activeMQConnectionFactory()); factory.setMessageConverter(jacksonJmsMessageConverter()); return factory; } /** * Topic JmsTemplate을 생성하여 반환 * * @return JmsTemplateQueue 객체 */ @Bean public JmsTemplate jmsTemplateTopic() { JmsTemplate jmsTemplate = new JmsTemplate(activeMQConnectionFactory()); jmsTemplate.setPubSubDomain(true); jmsTemplate.setMessageConverter(jacksonJmsMessageConverter()); jmsTemplate.setExplicitQosEnabled(true); // 메시지 전송 시 QOS을 설정 jmsTemplate.setDeliveryPersistent(false); // 메시지의 영속성을 설정 jmsTemplate.setReceiveTimeout(1000 * 3); // 메시지를 수신하는 동안의 대기 시간을 설정(3초) jmsTemplate.setTimeToLive(1000 * 60); // 메시지의 유효 기간을 설정(1분) return jmsTemplate; } /** * Topic JmsListenerContainerFactory을 위한 빈을 생성 * * @return JmsTemplate */ @Bean public JmsListenerContainerFactory<?> containerFactoryTopic() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setPubSubDomain(true); factory.setConnectionFactory(activeMQConnectionFactory()); factory.setMessageConverter(jacksonJmsMessageConverter()); return factory; } /** * MessageConverter을 위한 빈을 생성 * * @return MessageConverter */ @Bean public MessageConverter jacksonJmsMessageConverter() { MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); converter.setTargetType(MessageType.TEXT); converter.setTypeIdPropertyName("_typeId"); Map<String, Class<?>> typeIdMappings = new HashMap<>(); typeIdMappings.put("message", MessageDto.class); converter.setTypeIdMappings(typeIdMappings); return converter; } }
application.yml
ActiveMQ 연결 정보 및 Queue, Topic 정보를 추가하세요.# Service Post 설정 server: port: 9091 # Log Level 설정 logging: level: root: info # ActiveMQ 연결 정보 spring: activemq: broker-url: tcp://localhost:61616 user: admin password: admin # Activemq Queue, Topic 정보 activemq: queue.name: sample-queue topic.name: sample-topic
4. ActiveMQ Service, Dto 추가
Queue, Topic으로 메시지를 발행할 때는 JmsTemplate 클래스에 convertAndSend 메서드를 사용하고, 메시지를 구독할 때는 하는 @JmsListener 어노테이션을 사용하여 메서드를 구현하세요.
MessageServiceQueue.java
jmsTemplateQueue 빈을 사용하여 메시지를 발행하는 메서드와 containerFactoryQueue 빈을 사용하여 메시지를 구독하는 메서드를 구현하세요.package com.example.activemq.service; import com.example.activemq.dto.MessageDto; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.jms.annotation.JmsListener; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Service; @Slf4j @RequiredArgsConstructor @Service public class MessageQueueService { @Value("${activemq.queue.name}") private String queueName; private final JmsTemplate jmsTemplateQueue; /** * Queue로 메시지를 발행 * * @param messageDto 발행할 메시지의 DTO 객체 */ public void sendMessageQueue(MessageDto messageDto) { log.info("Message sent to queue: {}", messageDto.toString()); jmsTemplateQueue.convertAndSend(queueName, messageDto); } /** * Queue에서 메시지를 구독 * * @param messageDto 구독한 메시지를 담고 있는 MessageDto 객체 */ @JmsListener(destination = "${activemq.queue.name}", containerFactory = "containerFactoryQueue") public void reciveMessageQueue(MessageDto messageDto) { log.info("Received message from queue: {}", messageDto.toString()); } }
MessageServiceTopic.java
jmsTemplateTopic 빈을 사용하여 메시지를 발행하는 메서드와 containerFactoryTopic 빈을 사용하여 메시지를 구독하는 메서드를 구현하고, 하나의 메시지가 여러 구독자에게 전달되는지 확인하기 위해, 메서드를 하나 이상 추가하세요.package com.example.activemq.service; import com.example.activemq.dto.MessageDto; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.jms.annotation.JmsListener; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Service; @Slf4j @RequiredArgsConstructor @Service public class MessageTopicService { @Value("${activemq.topic.name}") private String topicName; private final JmsTemplate jmsTemplateTopic; /** * Topic으로 메시지를 발행 * * @param messageDto 발행할 메시지의 DTO 객체 */ public void sendMessageTopic(MessageDto messageDto) { log.info("Message sent to topic: {}", messageDto.toString()); jmsTemplateTopic.convertAndSend(topicName, messageDto); } /** * Topic에서 메시지를 구독 * * @param messageDto 구독한 메시지를 담고 있는 MessageDto 객체 */ @JmsListener(destination = "${activemq.topic.name}", containerFactory = "containerFactoryTopic") public void reciveMessageTopic01(MessageDto messageDto) { log.info("Received Message from Topic: {}", messageDto.toString()); } /** * Topic에서 메시지를 구독 * * @param messageDto 구독한 메시지를 담고 있는 MessageDto 객체 */ @JmsListener(destination = "${activemq.topic.name}", containerFactory = "containerFactoryTopic") public void reciveMessageTopic02(MessageDto messageDto) { log.info("Received Message from Topic: {}", messageDto.toString()); } /** * Topic에서 메시지를 구독 * * @param messageDto 구독한 메시지를 담고 있는 MessageDto 객체 */ @JmsListener(destination = "${activemq.topic.name}", containerFactory = "containerFactoryTopic") public void reciveMessageTopic03(MessageDto messageDto) { log.info("Received Message from Topic: {}", messageDto.toString()); } }
MessageDto.java메시지 객체를 전달하기 위해 DTO 클래스를 추가하세요.
package com.example.activemq.dto; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; import lombok.ToString; @Getter @Setter @ToString @AllArgsConstructor @NoArgsConstructor public class MessageDto { private String title; private String content; }
5. ActiveMQ Controller 추가
Queue, Topic으로 메시지를 발행할 수 있도록 간단한 REST API 추가하세요.
MessageController.javapackage com.example.activemq.contoller; import com.example.activemq.dto.MessageDto; import com.example.activemq.service.MessageQueueService; import com.example.activemq.service.MessageTopicService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; @Slf4j @RequiredArgsConstructor @RestController public class MessageController { private final MessageQueueService messageQueueService; private final MessageTopicService messageTopicService; /** * Queue로 메시지를 발행 * * @param messageDto 발행할 메시지의 DTO 객체 * @return ResponseEntity 객체로 응답을 반환 */ @RequestMapping(value = "/send/message/queue", method = RequestMethod.POST) public ResponseEntity<?> sendMessageQueue(@RequestBody MessageDto messageDto) { messageQueueService.sendMessageQueue(messageDto); return ResponseEntity.ok("Message sent to Queue!"); } /** * Topic으로 메시지를 발행 * * @param messageDto 발행할 메시지의 DTO 객체 * @return ResponseEntity 객체로 응답을 반환 */ @RequestMapping(value = "/send/message/topic", method = RequestMethod.POST) public ResponseEntity<?> sendMessageTopic(@RequestBody MessageDto messageDto) { messageTopicService.sendMessageTopic(messageDto); return ResponseEntity.ok("Message sent to Topic!"); } }
6. 메시지 발행 및 구독 확인
6_1. message.http 추가
### 큐(Queue)로 메시지 발행 POST http://localhost:9091/send/message/queue Content-Type: application/json { "title" : "[Queue] Message Title ", "content" : "[Queue] Message Content" } ### 토픽(Topic)으로 메시지 발행 POST http://localhost:9091/send/message/topic Content-Type: application/json { "title" : "[Topic] Message Title", "content" : "[Topic] Message Content" }
6_3. ActiveMQ 웹 관리 콘솔
큐(Queue)
토픽(Queue)
6_3. 콘솔 로그 확인
큐(Queue)
토픽(Queue)
소스 코드는 Github Repository - https://github.com/tychejin1218/message-queue.git 에서 active-sample-01 프로젝트를 참조하세요.반응형'Spring Boot > 기타' 카테고리의 다른 글
[Spring Boot] RedisJSON 연동 (0) 2023.07.17 [Spring Boot] RedisTemplate을 이용한 Redis Data Type 확인 (0) 2023.07.02 [Spring Boot] STOMP 사용 시 Interceptor 및 errorHandling 적용 (0) 2023.06.24 [Spring Boot] ActiveMQ + STOMP 연동 (0) 2023.06.17 [Spring Boot] ActiveMQ 연동하기 (0) 2023.06.11