ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Spring Boot] ActiveMQ Queue, Topic 연동
    Spring Boot/기타 2023. 7. 2. 18:14
    반응형

     

    ActiveMQ란?

    • Apache Software Foundation에서 개발된 오픈 소스 메시지 브로커(Message Broker)로 메시징 시스템에서 메시지를 중개하고 전달하는 역할을 담당합니다.
    • Java로 작성되었으며, Java Message Service (JMS)를 지원하여 다양한 애플리케이션 간에 비동기적인 통신을 가능하게 합니다.
    • 큐(Queue)와 토픽(Topic)이라는 두 가지 메시지 전달 방식을 지원하며, 큐(Queue) 방식은 메시지를 수신 대기 중인 클라이언트 중 하나에게 전달하고, 토픽(Topic) 방식은 메시지를 여러 구독자에게 전달하는 방식입니다. 

     ActiveMQ : https://activemq.apache.orghttps://activemq.apache.org

     

    1. Docker를 사용하여 ActiveMQ 설치

    Docker를 사용하여 ActiveMQ를 로컬에 설치하세요.

    docker run -p 61616:61616 -p 8161:8161 rmohr/activemq

    Docker Hub : https://hub.docker.com/r/rmohr/activemq

     

    포트 61616는 ActiveMQ 브로커(Broker) 연결, 포트 8161는ActiveMQ 웹 관리 콘솔에 사용됩니다.

    • 웹 관리 콘솔 : http://localhost:8161
    • 기본 계정 및 비밀번호 : admin/admin

     

    ActiveMQ 웹 관리 콘솔


    2. 의존성 추가

    Apache ActiveMQ를 사용하기 위해 'Spring Boot Starter ActiveMQ'를 추가하세요.

    Maven Repository : https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-activemq

    build.gradle

    plugins {
        id 'java'
        id 'org.springframework.boot' version '3.1.0'
        id 'io.spring.dependency-management' version '1.1.0'
    }
    
    group = 'com.example'
    version = '0.0.1-SNAPSHOT'
    sourceCompatibility = '17'
    
    configurations {
        compileOnly {
            extendsFrom annotationProcessor
        }
    }
    
    repositories {
        mavenCentral()
    }
    
    dependencies {
        // Spring Boot
        implementation 'org.springframework.boot:spring-boot-starter-web'
        testImplementation 'org.springframework.boot:spring-boot-starter-test'
    
        // ActiveMQ
        implementation 'org.springframework.boot:spring-boot-starter-activemq'
    
        compileOnly 'org.projectlombok:lombok'
        annotationProcessor 'org.projectlombok:lombok'
        testCompileOnly 'org.projectlombok:lombok'
        testAnnotationProcessor 'org.projectlombok:lombok'
    }
    
    tasks.named('test') {
        useJUnitPlatform()
    }


    3. ActiveMQ 설정

    • JMS(Java Message Service) 클라이언트와 ActiveMQ 메시지 브로커(Message Broker) 간의 연결을 위해 ActiveMQConnectionFactory을 설정하세요.
    • 큐(Queue)와 토픽(Topic)에 메시지를 발행하고 수신하기 위해 JmsTemplate을 설정하고, 메시지를 구독하기 위해 JmsListenerContainerFactory를 설정하세요.
    • JMS(Java Message Service) 메시지와 JSON 형식의 메시지 간의 변환을 위해 MessageConvert를 설정하세요.

    ActiveMQConfig.java

    • JmsTemplate의 setPubSubDomain(boolean pubSubDomain) 메서드는 JmsTemplate의 메시지 전송 방식을 결정합니다.
    • pubSubDomain 매개변수를 true로 설정하면, JmsTemplate은 Pub-Sub(Domain) 방식으로 동작합니다. Pub-Sub 방식은 Topic을 사용하여 메시지를 발행하고 구독하는 방식을 의미합니다.
    • pubSubDomain 매개변수를 false로 설정하면, JmsTemplate은 Point-to-Point(P2P) 방식으로 동작합니다. Point-to-Point 방식은 Queue를 사용하여 메시지를 발행하고 수신하는 방식을 의미합니다.
    package com.example.activemq.config;
    
    import com.example.activemq.dto.MessageDto;
    import java.util.HashMap;
    import java.util.Map;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
    import org.springframework.jms.config.JmsListenerContainerFactory;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
    import org.springframework.jms.support.converter.MessageConverter;
    import org.springframework.jms.support.converter.MessageType;
    
    @Configuration
    public class ActiveMQConfig {
    
      @Value("${spring.activemq.broker-url}")
      private String activemqBrokerUrl;
    
      @Value("${spring.activemq.user}")
      private String activemqUsername;
    
      @Value("${spring.activemq.password}")
      private String activemqPassword;
    
      /**
       * ActiveMQ 연결을 위한 ActiveMQConnectionFactory 빈을 생성하여 반환
       *
       * @return ActiveMQConnectionFactory 객체
       */
      @Bean
      public ActiveMQConnectionFactory activeMQConnectionFactory() {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
        activeMQConnectionFactory.setBrokerURL(activemqBrokerUrl);
        activeMQConnectionFactory.setUserName(activemqUsername);
        activeMQConnectionFactory.setPassword(activemqPassword);
        return activeMQConnectionFactory;
      }
    
      /**
       * Queue JmsTemplate을 생성하여 반환
       *
       * @return JmsTemplateQueue 객체
       */
      @Bean
      public JmsTemplate jmsTemplateQueue() {
        JmsTemplate jmsTemplate = new JmsTemplate(activeMQConnectionFactory());
        jmsTemplate.setMessageConverter(jacksonJmsMessageConverter());
        jmsTemplate.setExplicitQosEnabled(true);    // 메시지 전송 시 QOS을 설정
        jmsTemplate.setDeliveryPersistent(false);   // 메시지의 영속성을 설정
        jmsTemplate.setReceiveTimeout(1000 * 3);    // 메시지를 수신하는 동안의 대기 시간을 설정(3초)
        jmsTemplate.setTimeToLive(1000 * 60 * 30);  // 메시지의 유효 기간을 설정(30분)
        return jmsTemplate;
      }
    
      /**
       * Queue JmsListenerContainerFactory을 위한 빈을 생성
       *
       * @return JmsTemplate
       */
      @Bean
      public JmsListenerContainerFactory<?> containerFactoryQueue() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(activeMQConnectionFactory());
        factory.setMessageConverter(jacksonJmsMessageConverter());
        return factory;
      }
    
      /**
       * Topic JmsTemplate을 생성하여 반환
       *
       * @return JmsTemplateQueue 객체
       */
      @Bean
      public JmsTemplate jmsTemplateTopic() {
        JmsTemplate jmsTemplate = new JmsTemplate(activeMQConnectionFactory());
        jmsTemplate.setPubSubDomain(true);
        jmsTemplate.setMessageConverter(jacksonJmsMessageConverter());
        jmsTemplate.setExplicitQosEnabled(true);    // 메시지 전송 시 QOS을 설정
        jmsTemplate.setDeliveryPersistent(false);   // 메시지의 영속성을 설정
        jmsTemplate.setReceiveTimeout(1000 * 3);    // 메시지를 수신하는 동안의 대기 시간을 설정(3초)
        jmsTemplate.setTimeToLive(1000 * 60);       // 메시지의 유효 기간을 설정(1분)
        return jmsTemplate;
      }
    
      /**
       * Topic JmsListenerContainerFactory을 위한 빈을 생성
       *
       * @return JmsTemplate
       */
      @Bean
      public JmsListenerContainerFactory<?> containerFactoryTopic() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(true);
        factory.setConnectionFactory(activeMQConnectionFactory());
        factory.setMessageConverter(jacksonJmsMessageConverter());
        return factory;
      }
    
      /**
       * MessageConverter을 위한 빈을 생성
       *
       * @return MessageConverter
       */
      @Bean
      public MessageConverter jacksonJmsMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_typeId");
        Map<String, Class<?>> typeIdMappings = new HashMap<>();
        typeIdMappings.put("message", MessageDto.class);
        converter.setTypeIdMappings(typeIdMappings);
        return converter;
      }
    }

     

    application.yml
    ActiveMQ 연결 정보 및 Queue, Topic 정보를 추가하세요.

    # Service Post 설정
    server:
      port: 9091
    
    # Log Level 설정
    logging:
      level:
        root: info
    
    # ActiveMQ 연결 정보
    spring:
      activemq:
        broker-url: tcp://localhost:61616
        user: admin
        password: admin
    
    # Activemq Queue, Topic 정보
    activemq:
      queue.name: sample-queue
      topic.name: sample-topic

     

    4. ActiveMQ Service, Dto 추가

    Queue, Topic으로 메시지를 발행할 때는 JmsTemplate 클래스에 convertAndSend 메서드를 사용하고, 메시지를 구독할 때는 하는 @JmsListener 어노테이션을 사용하여 메서드를 구현하세요.

    MessageServiceQueue.java
    jmsTemplateQueue 빈을 사용하여 메시지를 발행하는 메서드와 containerFactoryQueue 빈을 사용하여 메시지를 구독하는 메서드를 구현하세요.

    package com.example.activemq.service;
    
    import com.example.activemq.dto.MessageDto;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.stereotype.Service;
    
    @Slf4j
    @RequiredArgsConstructor
    @Service
    public class MessageQueueService {
    
      @Value("${activemq.queue.name}")
      private String queueName;
    
      private final JmsTemplate jmsTemplateQueue;
    
      /**
       * Queue로 메시지를 발행
       *
       * @param messageDto 발행할 메시지의 DTO 객체
       */
      public void sendMessageQueue(MessageDto messageDto) {
        log.info("Message sent to queue: {}", messageDto.toString());
        jmsTemplateQueue.convertAndSend(queueName, messageDto);
      }
    
      /**
       * Queue에서 메시지를 구독
       *
       * @param messageDto 구독한 메시지를 담고 있는 MessageDto 객체
       */
      @JmsListener(destination = "${activemq.queue.name}", containerFactory = "containerFactoryQueue")
      public void reciveMessageQueue(MessageDto messageDto) {
        log.info("Received message from queue: {}", messageDto.toString());
      }
    }


    MessageServiceTopic.java
    jmsTemplateTopic 빈을 사용하여 메시지를 발행하는 메서드와 containerFactoryTopic 빈을 사용하여 메시지를 구독하는 메서드를 구현하고, 하나의 메시지가 여러 구독자에게 전달되는지 확인하기 위해, 메서드를 하나 이상 추가하세요.

    package com.example.activemq.service;
    
    import com.example.activemq.dto.MessageDto;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.stereotype.Service;
    
    @Slf4j
    @RequiredArgsConstructor
    @Service
    public class MessageTopicService {
    
      @Value("${activemq.topic.name}")
      private String topicName;
    
      private final JmsTemplate jmsTemplateTopic;
    
      /**
       * Topic으로 메시지를 발행
       *
       * @param messageDto 발행할 메시지의 DTO 객체
       */
      public void sendMessageTopic(MessageDto messageDto) {
        log.info("Message sent to topic: {}", messageDto.toString());
        jmsTemplateTopic.convertAndSend(topicName, messageDto);
      }
    
      /**
       * Topic에서 메시지를 구독
       *
       * @param messageDto 구독한 메시지를 담고 있는 MessageDto 객체
       */
      @JmsListener(destination = "${activemq.topic.name}", containerFactory = "containerFactoryTopic")
      public void reciveMessageTopic01(MessageDto messageDto) {
        log.info("Received Message from Topic: {}", messageDto.toString());
      }
    
      /**
       * Topic에서 메시지를 구독
       *
       * @param messageDto 구독한 메시지를 담고 있는 MessageDto 객체
       */
      @JmsListener(destination = "${activemq.topic.name}", containerFactory = "containerFactoryTopic")
      public void reciveMessageTopic02(MessageDto messageDto) {
        log.info("Received Message from Topic: {}", messageDto.toString());
      }
    
      /**
       * Topic에서 메시지를 구독
       *
       * @param messageDto 구독한 메시지를 담고 있는 MessageDto 객체
       */
      @JmsListener(destination = "${activemq.topic.name}", containerFactory = "containerFactoryTopic")
      public void reciveMessageTopic03(MessageDto messageDto) {
        log.info("Received Message from Topic: {}", messageDto.toString());
      }
    }


    MessageDto.java

    메시지 객체를 전달하기 위해 DTO 클래스를 추가하세요.

    package com.example.activemq.dto;
    
    import lombok.AllArgsConstructor;
    import lombok.Getter;
    import lombok.NoArgsConstructor;
    import lombok.Setter;
    import lombok.ToString;
    
    @Getter
    @Setter
    @ToString
    @AllArgsConstructor
    @NoArgsConstructor
    public class MessageDto {
    
      private String title;
      private String content;
    }

     

    5. ActiveMQ Controller 추가

    Queue, Topic으로 메시지를 발행할 수 있도록 간단한 REST API 추가하세요.
    MessageController.java

    package com.example.activemq.contoller;
    
    import com.example.activemq.dto.MessageDto;
    import com.example.activemq.service.MessageQueueService;
    import com.example.activemq.service.MessageTopicService;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.http.ResponseEntity;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.bind.annotation.RestController;
    
    @Slf4j
    @RequiredArgsConstructor
    @RestController
    public class MessageController {
    
      private final MessageQueueService messageQueueService;
      private final MessageTopicService messageTopicService;
    
      /**
       * Queue로 메시지를 발행
       *
       * @param messageDto 발행할 메시지의 DTO 객체
       * @return ResponseEntity 객체로 응답을 반환
       */
      @RequestMapping(value = "/send/message/queue", method = RequestMethod.POST)
      public ResponseEntity<?> sendMessageQueue(@RequestBody MessageDto messageDto) {
        messageQueueService.sendMessageQueue(messageDto);
        return ResponseEntity.ok("Message sent to Queue!");
      }
    
      /**
       * Topic으로 메시지를 발행
       *
       * @param messageDto 발행할 메시지의 DTO 객체
       * @return ResponseEntity 객체로 응답을 반환
       */
      @RequestMapping(value = "/send/message/topic", method = RequestMethod.POST)
      public ResponseEntity<?> sendMessageTopic(@RequestBody MessageDto messageDto) {
        messageTopicService.sendMessageTopic(messageDto);
        return ResponseEntity.ok("Message sent to Topic!");
      }
    }

     

    6. 메시지 발행 및 구독 확인

    6_1. message.http 추가

    ### 큐(Queue)로 메시지 발행
    POST http://localhost:9091/send/message/queue
    Content-Type: application/json
    
    {
        "title" : "[Queue] Message Title ",
        "content" : "[Queue] Message Content"
    }
    
    ### 토픽(Topic)으로 메시지 발행
    POST http://localhost:9091/send/message/topic
    Content-Type: application/json
    
    {
      "title" : "[Topic] Message Title",
      "content" : "[Topic] Message Content"
    }

     

    6_3. ActiveMQ 웹 관리 콘솔

    큐(Queue) 

     

    토픽(Queue) 

     

    6_3. 콘솔 로그 확인

    큐(Queue) 

     

    토픽(Queue) 



    소스 코드는 Github Repository - https://github.com/tychejin1218/message-queue.git 에서 active-sample-01 프로젝트를 참조하세요.






     

    반응형

    댓글

Designed by Tistory.