ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Spring Boot] STOMP 사용 시 Interceptor 및 errorHandling 적용
    Spring Boot/기타 2023. 6. 24. 17:49
    반응형

    STOMP 사용 시  Interceptor 및 errorHandling 적용

    WebSocketConfig.java 수정

    • setErrorHandler() 메서드는 StompEndpointRegistry 클래스에서 제공되는 메서드로, STOMP 엔드포인트 등록 중에 발생하는 오류 처리를 설정하는데 사용할 수 있습니다.
    • configureClientInboundChannel() 메서드는 WebSocketMessageBrokerConfigurer 인터페이스에서 제공되는 메서드로, 클라이언트로부터 수신한 STOMP 메시지를 처리하기 위한 클라이언트 인바운드 채널을 구성하는데 사용할 수 있습니다.
    package com.example.activemq.config;
    
    import com.example.activemq.common.stomp.StompErrorHandler;
    import com.example.activemq.common.stomp.StompPreHandler;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.messaging.simp.config.ChannelRegistration;
    import org.springframework.messaging.simp.config.MessageBrokerRegistry;
    import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
    import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
    import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
    
    @Slf4j
    @RequiredArgsConstructor
    @EnableWebSocketMessageBroker // WebSocket 메시지 브로커를 활성화
    @Configuration
    public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    
      private final StompPreHandler stompPreHandler;
      private final StompErrorHandler stompErrorHandler;
    
      @Value("${spring.activemq.stomp.host}")
      private String activemqStompHost;
    
      @Value("${spring.activemq.stomp.port}")
      private String activemqStompPort;
    
      @Value("${spring.activemq.user}")
      private String activemqUsername;
    
      @Value("${spring.activemq.password}")
      private String activemqPassword;
    
      /**
       * STOMP 관련 설정을 구성
       */
      @Override
      public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws") // WebSocket 엔드포인트 설정
            .setAllowedOriginPatterns("*")
            .withSockJS(); // withSockJS()를 사용하여 SockJS를 활성화
        registry.setErrorHandler(stompErrorHandler);
      }
    
      /**
       * 메시지 브로커 관련 설정을 구성
       */
      @Override
      public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableStompBrokerRelay("/queue") // 메시지를 구독할 경로를 설정
            // ActiveMQ 브로커와 연결을 위한 호스트, 가상 호스트 및 포트, 관리자 로그인 설정
            .setRelayHost(activemqStompHost)
            .setRelayPort(Integer.parseInt(activemqStompPort))
            .setSystemLogin(activemqUsername)
            .setSystemPasscode(activemqPassword)
            .setClientLogin(activemqUsername)
            .setClientPasscode(activemqPassword);
      }
    
      /**
       * 메시지 요청/응답에 관련된 인터셉터를 추가
       */
      @Override
      public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(stompPreHandler);
      }
    }

     

    StompPreHandler.java 추가

    • ChannelInterceptor 인터페이스를 구현하며, 메시지가 채널로 전송되기 전에 실행되는 preSend() 메서드를 오버라이드하여 메시지를 사전 처리합니다.
    package com.example.activemq.common.stomp;
    
    import java.util.List;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.MessageDeliveryException;
    import org.springframework.messaging.simp.stomp.StompCommand;
    import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
    import org.springframework.messaging.support.ChannelInterceptor;
    import org.springframework.util.CollectionUtils;
    
    @Slf4j
    @RequiredArgsConstructor
    @Configuration
    public class StompPreHandler implements ChannelInterceptor {
    
      @Value("${api-key.name}")
      private String apiKeyName;
    
      @Value("${api-key.value}")
      private String apiKeyValue;
    
      /**
       * 메시지가 채널로 전송되기 전에 실행
       *
       * @param message 메시지 객체
       * @param channel 메시지 채널
       * @return 수정된 메시지 객체
       */
      @Override
      public Message<?> preSend(Message<?> message, MessageChannel channel) {
    
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
    
        // 메시지의 구독 명령이 CONNECT인 경우에만 실행
        if (StompCommand.CONNECT.equals(accessor.getCommand())) {
          StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(message);
    
          // 메시지의 헤더에서 'X-API-KEY'으로 지정된 헤더 값을 가져와 'B09ED8799B61ABE6331D3DA5BCFD72ADAFAC5329C3990B3F4075687CAE532CC6'와 비교
          List<String> headers = headerAccessor.getNativeHeader(apiKeyName);
          if (CollectionUtils.isEmpty(headers) || !apiKeyValue.equals(headers.get(0))) {
            throw new MessageDeliveryException("UNAUTHORIZED");
          }
        }
    
        return message;
      }
    }

     

    StompErrorHandler.java 추가 

    • StompSubProtocolErrorHandler 클래스를 상속하며, 클라이언트 메시지 처리 중에 발생한 오류를 처리하는 역할합니다.
    package com.example.activemq.common.stomp;
    
    import java.nio.charset.StandardCharsets;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.simp.stomp.StompCommand;
    import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.web.socket.messaging.StompSubProtocolErrorHandler;
    
    @Slf4j
    @RequiredArgsConstructor
    @Configuration
    public class StompErrorHandler extends StompSubProtocolErrorHandler {
    
      /**
       * 클라이언트 메시지 처리 중에 발생한 오류를 처리
       *
       * @param clientMessage 클라이언트 메시지
       * @param ex 발생한 예외
       * @return 오류 메시지를 포함한 Message 객체
       */
      @Override
      public Message<byte[]> handleClientMessageProcessingError(
          Message<byte[]> clientMessage,
          Throwable ex) {
    
        // 오류 메시지가 "UNAUTHORIZED"인 경우 - throw new MessageDeliveryException("UNAUTHORIZED")
        if ("UNAUTHORIZED".equals(ex.getMessage())) {
          return errorMessage("유효하지 않은 권한입니다.");
        }
    
        return super.handleClientMessageProcessingError(clientMessage, ex);
      }
    
      /**
       * 오류 메시지를 포함한 Message 객체를 생성
       *
       * @param errorMessage 오류 메시지
       * @return 오류 메시지를 포함한 Message 객체
       */
      private Message<byte[]> errorMessage(String errorMessage) {
    
        StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR);
        accessor.setLeaveMutable(true);
    
        return MessageBuilder.createMessage(errorMessage.getBytes(StandardCharsets.UTF_8),
            accessor.getMessageHeaders());
      }
    }

     

    application.yml 수정

    • 메시지의 헤더 정보를 추가하세요.
    # Service Post 설정
    server:
      port: 9091
    
    # Log Level 설정
    logging:
      level:
        root: info
    
    # ActiveMQ 연결 정보
    spring:
      activemq:
        broker-url: tcp://localhost:61616
        stomp:
          host: localhost
          port: 61613
        user: admin
        password: admin
    
    # Activemq queue 정보
    activemq:
      queue.name: sample-queue
    
    # API KEY 정보
    api-key:
      name: X-API-KEY
      value: B09ED8799B61ABE6331D3DA5BCFD72ADAFAC5329C3990B3F4075687CAE532CC6

     

    메시지 구독 테스트

    유효하지 않은 경우

    정상적인 경우

     

    [SpringBoot] ActiveMQ + STOMP 연동 - https://tychejin.tistory.com/422

    소스 코드는 Github Repository - https://github.com/tychejin1218/message-queue.git 에서 active-stomp-sample 프로젝트를 참조하세요.

    반응형

    댓글

Designed by Tistory.