본문 바로가기
IT/기타

chapter 8. 비동기 메시지 전송하기 (책. Spring in Action)

by eddie_factory 2022. 1. 28.
반응형

chapter 8. 비동기 메시지 전송하기

 - 비동기 메시지 전송
 - JMS(Java Message Service), RabbitMQ, 카프카(kafka)를 사용해서 메시지 전송하기
 - 브로커에서 메시지 가져오기
 - 메시지 리스닝하기

 

tip. 비동기 메시지는 애플리케이션 간의 결합도를 낮추고 확장성을 높여준다.
JMS(Java Message Service), RabbitMQ, AMQP(Advanced Message Queueing Protocol), 카프카 등이 있음.

 

1. JMS로 메시지 전송하기

- JMS는 두개 이상의 클라이언트 간에 메시지 통신을 위한 공통 API를 정의하는 자바 표준
- 스프링은 JmsTemplate이라는 템플릿 기반의 클래스를 통해 JMS를 지원
- 프로듀서(producer)가 큐와 토픽에 메세지를 전송하고 컨슈머(consumer)는 그 메세지를 받음
- 스프링은 메시지 기반의 POJO도 지원, 큐나 토픽에 도착하는 메시지에 반응해 비동기 방식으로 메세지를 수신하는 자바 객체

 1) JMS 설정하기

  - 사용할 의존성 추가 (Spring in Action에서는 ActiveMQ artemis를 사용함)

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-artemis</artifactId>
   <scope>runtime</scope>
</dependency>

  * Artemis 브로커의 위치와 인증정보를 구성하는 속성 및 yml 설정 (만약 ActiveMQ 사용시 속성정보를 달리 설정해야함)

속성 설명
spring.artemis.host 브로커의 호스트
spring.artemis.port 브로커의 포트
spring.artemis.user 브로커를 사용하기 위한 사용자
spring.artemis.password 브로커를 사용하기 위한 사용자 암호

 

spring:
  artemis:
    host: artemis.tacocloud.com
    port: 61617
    user: tacoweb
    password: password

*브로커란  목적지에 안전하게 메시지를 건네주는 중개자

 

2) JmsTemplate을 사용해서 메시지 전송하기

 - send()와 convertAndSend()의 메서드를 가지고 있으며, 오버로딩해 사용하고 있음
 - send()메서드는 Message 객체를 생성하기위해 MessageCreator를 필요로 함

 - convertAndSend()는 Object타입 객체를 인자로 받아 내부적으로 Message타입으로 변환

 - convertAndSend()가 MessagePostProcessor을 인자로 받을 경우 Message를 커스터마이징 수 있음

 

@Override
public void sendOrder(Order order) {
    jms.send(new MessageCreator() {
        @Override
        public Message createMessage(Session session) throws JMSException {
            return session.createObjectMessage(order);
        }
    });
}

ex) jms.send() 메서드가 Order객체로 부터 메시지를 생성

      MessageCreator 인터페이스를 구현한 익명의 내부 클래스를 인자로 전달하여 jms.send() 호출

 

위의 메세지는 도착지를 지정하지 않았으므로  제대로 실행되게 하려면 설정파일(application.yml) 파일에 속성을 지정해야 함

spring:
  jms:
    template:
      default-destination: tacocloud.order.queue

도착지를 지정하는 다른 방법 
 -  Destination  Bean을 선언하고 주입한다.

@Bean
public Destination orderQueue() {
    return new ActiveMQQueue("tacocloud.order.queue");
}

// 

private  Destination orderQueue;

@Autowired
public JmsOrderMessagingService(JmsTemplate jms, Destination orderQueue) {
    this.jms = jms;
    this.orderQueue =orderQueue;
}

@Override
public void sendOrder(Order order) {
    jms.send(
            orderQueue,
            session -> session.createObjectMessage(order));
}

 

 -  Destination 객체 대신 도착지 이름을 지정하여 넣음 

@Override
public void sendOrder(Order order) {
    jms.send(
            "tacocloud.order.queue",
            session -> session.createObjectMessage(order));
}

 

메세지 변환기

메세지 변환기 하는일
MappingJackson2HttpMessageConverter 메세지를 JSON형태로 변환
MarshalligMessageConverter 메세지를 XML형태로 변환
MessagingMessageConverter Message 객체로 변환
SimpleMessageConverter 문자열, byte, Map, Serializable객체를 각각 맞는 메세지형태로 변환

 

@Bean
public MappingJackson2MessageConverter messageConverter() {
    MappingJackson2MessageConverter messageConverter =
            new MappingJackson2MessageConverter();
    messageConverter.setTypeIdPropertyName("_typeId");

    Map<String, Class<?>> typeIdMappings = new HashMap<String, Class<?>>();
    typeIdMappings.put("order", Order.class);
    messageConverter.setTypeIdMappings(typeIdMappings);

    return messageConverter;
}

수신된  메시지의 변환 타입을 메시지 수신자가 알아야하기 때문에  setTypeIdMappings 메소드에 해당 클래스 타입을 넘겨줘 맞출 수 있도록 해야함.

 

 

후처리 메시지

전송 객체 이외에 추가적으로 메시지를 보내야한다면 커스텀 헤더를 메시지에 추가해서 보낼 수 있음.

jms.send(
        "tacocloud.order.queue",
        session ->  {
            Message message = session.createObjectMessage(order);
            message.setStringProperty("X_ORDER_SOURCE", "WEB");
        });

send()가아닌 convertAndSend()를 사용하면 Message 객체가 내부적으로 생성되므로 접근할 수 없음

다만 MessagePostProcessor를 사용해서 메시지가 전송되기 전에 헤더를 추가할 수 있음

jms.convertAndSend("tacocloud.order.queue", order,
        new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws JMSException {
                message.setStringProperty("X_ORDER_SOURCE", "WEB");

                return message;
            }
        });

메소드 참조를 사용해서 아래와 같이 전송할 수 있음

@Override
public void sendOrder(Order order) {
    jms.convertAndSend("tacocloud.order.queue", order,
            this::addOrderSource);
}

private Message addOrderSource(Message message) throws JMSException {
    message.setStringProperty("X_ORDER_SOURCE", "WEB");
    return message;
}

 

2) JMS 메시지 수신하기

JmsTemplate는 전체 메서드가 pull 모델을 기반으로 하고 있다.  메시지 리스너를 정의해서 pus 모델로 사용 가능함
  - pull 모델 : 메시지를 요청하고 도착할 때까지 기다리는 모델

  - push 모델 : 메시지를 수신 가능한 상태라면 자동으로 전달하는 모델

 

 - JmsTemplate에는 수신을 위해 receive()와 receiveAndConvert()  메서드를 가짐, 이는 send(), sendAndConvertI()에 대응함

 - receive() 메서드는 원시 메시지를 수신

 - receiveAndConvert() 메서드는 메시지를 도메인 타입으로 변환하기 위해 메시지 변환기를 사용

 - 각 메서드는 도착지 객체나 문자열을 지정하거나 기본 도착지를 사용할 수 있음.

private JmsTemplate jms;
private MessageConverter converter;

@Autowired
public JmsOrderReceiver(JmsTemplate jms, MessageConverter converter) {
    this.jms = jms;
    this.converter =  converter;
}

@Override
public Order receiveOrder() {
    Message message = jms.receive("tacocloud.order.queue");
    return (Order) converter.fromMessage(message);
}

receive()메서드를 사용,도착지를 문자열로 지정하여 원시타입의 메시지를 반환 받는다.  수신 메시지의 타입ID 속성은 Order이고, converter를 사용해 변환한 객체 타입은 Object이기 때문에 Order로 캐스팅한 후 반환해야 함

 

private JmsTemplate jms;

public JmsOrderReceiver(JmsTemplate jms) {
    this.jms = jms;
}

@Override
public Order receiveOrder() {
    return (Order) jms.receiveAndConvert("tacocloud.order.queue");
}

receiveAndConvert() 메소드를 사용하면 MessageConverter를 주입하지 않고 수신할 수 있음

 

메시지 리스너 선언하기

리스너를 사용하지 않을 경우  pull 모델로 메시지 수신 메서드를 호출해야하는데 반해, 리스너는 메시지가 도착할 때까지 대기하는 수동적 컴포넌트

@Component
public class OrderListener {

    private KitchenUI ui;

    @Autowired
    public OrderListener(KitchenUI ui) {
        this.ui = ui;
    }

    @JmsListener(destination = "tacocloud.order.queue")
    public void receiveOrder(Order order) {
        ui.displayOrder(order);
    }

메시지 리스너를 선언하려면 @JsmListener를 지정해야 함.

@JsmListener 애노테이션이 지정된 메서드는 JmsTemplate을 사용하지 않으며 애플리케이션 내에서 호출되지 않는다.

대신 스프링 프레임워크 코드가 특정 도착지에 메시지가 도착하는 것을 기다리다가 도착하면 해당 메시지에 적재된 Order 객체가 인자로 전달되면서 receiveOrder() 메서드가 자동 호출 됨.

 

메시지 리스너는 중단 없이 다수의 메시지를 처리할 수 있어 상황에 맞게 사용하여야 함.

 

2. RabbitMQ와 AMQP 사용하기

 - RabbitMQ는 라우팅 전략을 제공함.

 - 수신자가 리스닝하는 큐와 분리된 거래소 이름과 라우팅 키를 주소로 사용.

 - 메시지가 RabbitMQ 브로커에 도착하면 거래소, 바인딩, 라우팅 키값을 기반으로 메시지를 처리함

거래소 종류 설명
기본(default) 브로커가 자동으로 생성하는 거래소. 라우팅키와 이름이 같은 큐로 메시지를 전달한다. 모든 큐는 자동으로 기본거래소와 연결된다.
디렉트(Direct) 바인딩 키가 해당 메시지의 라우팅 키와 같은 큐에 메시지를 전달
토픽(Topic) 바인딩 키가 해당 메시지의 라우팅 키와 일치하는 하나 이상의큐에 메시지를 전달
팬아웃(Fanout) 키와 상관 없이 연결된 모든 큐에 메시지를 전달함
헤더(Header) 토픽 거래소와 유사, 라우팅 키 대신 헤더 값을 기반으로 함
데드레터(Dead letter) 전달 불가능. 거래소- 큐 바인딩과도 일치하지 않는 메시지를 보관하는 거래소

 

1) RabbitMQ - 스프링 설정

dependency 추가

AMQP를 빌드하면 다른 지원 컴포넌트들도 활 할수 있음

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
속성 설명
spring.rabbitmq.addresses 리스트 형태의 RabbitMQ 브로커 주소
spring.rabbitmq.host 브로커의 호스트
spring.rabbitmq.port 브로커의 포트
spring.rabbitmq.username 브로커를 사용하기 위한 사용자 이름
spring.rabbitmq.password 브로커를 사용하기 위한 사용자 암호
spring:
  rabbitmq:
    host: rabbit.tacocloud.com
    port: 5673
    username: tacoweb
    password: tacoweb

 

2) 메세지 전송하기

JmsTemplate 메서드와 달리 RabbitTemplate 메서드는 거래소와 라우팅 키 형태로 전송함

 - send() 메서드는 원시 Message 객체를 전송함

 - convertAndSend() 메서드는 전송에 앞서 내부적으로 변환될 객체를 인자로 받음 또한 MessagePostProcessor 인자를 받아 Message 객체를 조작할 수 있음.

 - Destination 대신, 거래소와 라우팅 키를 지정하는 문자열을 받는다는 점에서 JmsTemplate의 메서드들과 다름

 

public class RabbitOrderMessagingService
        implements OrderMessagingService {

    private RabbitTemplate rabbit;

    @Autowired
    public RabbitOrderMessagingService(RabbitTemplate rabbit) {
        this.rabbit = rabbit;
    }

    public void sendOrder(Order order) {
        MessageConverter converter = rabbit.getMessageConverter();
        MessageProperties props = new MessageProperties();
        Message message = converter.toMessage(order, props);
        rabbit.send("tacocloud.order", message);
    }
}

MessageConverter는 JmsTemplate과 동일하게 객체 변경을 위해 사용함

메시지 속성은 MessageProperties를 사용해서 제공해야 함 (속성을 설정할 필요가 없으면 기본 인스턴스만 있으면 됨)

메시지와 함께 라우팅 키 tacocloud.order를  전달하므로 거래소는 기본거래소가 사용됨.

 

기본거래소의 이름은 빈 문자열인 ""이며, 이것은 RabbitMQ 브로커가 자동으로 생성하는 기본거래소. 기본 라우팅 키도 ""(빈문자열)

spring:
  rabbitmq:
    template:
      exchange: tacocloud.orders
      routing-key: kitchens.central

설정 파일에 입력하여  기본값을 설정할 수 있습니다.

 

메시지변환기 구성하기

메세지 변환기 하는일
Jackson2JsonMessageConverter 메세지를 JSON형태로 변환
MarshalligMessageConverter Marshaller와 Unmarshaller를 사용해서  xml로 변환
SimpleMessageConverter 문자열, byte, Map, Serializable객체를 각각 맞는 메세지형태로 변환
ContentTypeDelegatingMessageConverter contentType 헤더를 기반으로 다른 메시지 변환기에 변환을 위임

메시지 변환기를 변경해야할 때는 MessageConverter 타입의 빈을 구성하면 됨

 

메시지 속성 설정하기

JMS에서처럼 전송하는 메시지의 일부 헤더를 설정해야할 경우 MessageProperties 인스턴스를 통해 헤더를 설정할 수 있다.

MessageProperties props = message.getMessageProperties();
props.setHeader("X_ORDER_SOURCE", "WEB");

 

convertAndSend()를 사용할 때는 MessagPostProcesso에서 MessageProperties를 선언해서 사용할 수 있음

public void sendOrder(Order order) {
    rabbit.convertAndSend("tacocloud.order.queue", order,
            new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message)
                        throws AmqpException {
                    MessageProperties props = message.getMessageProperties();
                    props.setHeader("X_ORDER_SOURCE", "WEB");
                    return message;
                }
            });
}

 

3) RabbitMQ로부터 메시지 수신하기

 - RabbitTemplate을 사용해서 큐로부터 메시지를 가져옴 (pull)
 - @RabbitListener가 기정된 메서드로 메시지가 push됨

 

RabbitTemplate을 사용해서 메시지 수신하기

receive()와 receiveAndConverter() 메서드가 존재함

수신 메서드에는 거래소나 라우팅 키를 매개변수로 갖지 않는다. 왜냐하면 거래소와 라우팅 키는 메시지를 큐로 전달하는데 사용되지만, 일단 메시지가 큐에 들어가면 다음 메시지 도착지는 큐로부터 메시지를 소비하는 컨슈머이기 때문

메시지를 소비하는 에플리케이션은 거래소 및 라우팅 키를 신경 쓸 필요가 없고 큐만 알고 있으면 된다.

또한 수신 타임아웃을 나타내기 위해 long 타입의 매개변수를 갖음.  수신 할 수 있는 메시지가 없는 경우 null의 값이 반환

타임아웃 값을 지정했더라도 null 값이 반환되는 경우를 대비하여 처리하는 코드 작성 필요

 

public Order receiveOrder() {
    Message message = rabbit.receive("tacocloud.orders");
    return message != null ? (Order) converter.fromMessage(message) : null;
}

수신 구현부는 JMS와 크게 다르지 않다. receive에 큐를 지정하고, 컨버터를 통해 원시 메시지를 변환시켜 줌.

 

Message message = rabbit.receive("tacocloud.orders", 30000);
spring:
  rabbitmq:
    template:
      receive-timeout: 30000

메시지를 수신하는 타임아웃 시간을 매개변수로 지정할 수 있으며, 기본 타임아웃시간을 설정파일에 설정할 수 있음.

 

public Order receiveOrder() {
    return (Order) rabbit.receiveAndConvert("tacocloud.order.queue");
}

receiveAndConvert() 메서드를 사용하면 원시메시지를 변환할 필요 없이 바로 리턴할 수 있음

public Order receiveOrder() {
    return rabbit.receiveAndConvert("tacocloud.order.queue", new ParameterizedTypeReference<Order>());
}

ParameterizedTypeReference<T>를 사용하는 방법으로 객체를 수신하게 하는 방법도 있으며, 이방법이 type-safe 측면에선 캐스팅 보다 좋다.

 

리스너를 사용해서 RabbitMQ 메시지 처리하기

JMSListener와 동일하게 애노테이션을 사용하여 리스너를 작성 한다.

@RabbitListener(queues = "tacocloud.order.queue")
public void receiveOrder(Order order) {
    ui.displayOrder(order);
}

 

 

3. 카프카 사용하기

아파치 카프카는 RabbitMQ와 유사한 메시지 브로커지만 특유의 아키텍처를 가짐

 - 높은 확장성을 제공하는 클러스터로 실행되도록 설계

 - 클러스터의 모든 인스턴스에 걸쳐 토픽을 파티션으로 분할하여 메시지를 관리

 - RabbitMQ가 거래소와 큐를 사용해서 메시지를 처리하는 반면, 카프카는 토픽만 사용

 

카프카의 토픽은 클러스터의 모든 브로커에 걸쳐 복제됨. 클러스터의 각 노드는 하나 이상의 토픽에 대한 리더로 동작하며, 토픽 데이터를 관리하고 다른 노드로 데이터를 복제함.

* 카프카의 토픽은 저장소라고 라고 이해

 

1) 카프카 사용을 위한 스프링 설정

<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
</dependency>
spring:
  kafka:
    bootstrap-servers:
      - localhost:9092

dependency 추가 및 yml 설정

bootstrap-servers 속성에는 카프카 클러스터로의 초기 연결에 사용되는 하나 이상의 카프카 서버들의 위치를 설정함

이는 복수형이며, 여러 서버를 지정할 수 있음

 

2) KafkaTemplate을 사용해서 메시지 전송하기
KafkaTemplate의 가장큰 차이점은 converterAndSend() 메서드가 없다는 것

 - KafkaTemplate은 제네릭 타입을 사용하고, 메시지를 전송할 때 직접 도메인 타입을 처리할수 있기 때문

 - send() 메서드가 converterAndSend()의 기능을 갖고 있다고 생각하면 됨

KafkaTemplate는 토픽과 페이로드가 가장 중요한 매개변수 

 * 페이로드 : 메시지에 적재된 데이터이며 필수 값

@Service
public class KafkaOrderMessagingService
        implements OrderMessagingService {

    private KafkaTemplate<String, Order> kafkaTemplate;

    @Autowired
    public KafkaOrderMessagingService(
            KafkaTemplate<String, Order> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @Override
    public void sendOrder(Order order) {
        kafkaTemplate.send("tacocloud.orders.topic", order);
    }

}

 JMS와 RabbitMQ와 유사한 형태의 send()메서드를 통해 메시지를 전송하며, 만약 yml설정에 기본 토픽을 설정해둔다면 sendDefault()메서드를 호출. 이때 토픽은 매개변수로 전달하지 않아도 됨

 

3) 카프카 리스너 작성하기

스프링을 사용해서 카프카 토픽의 메시지를 가져오는 유일한 방법은 메시지 리스너를 작성하는 방법 뿐

@KafkaListener 애노테이션을 통해 메서드를 정의

@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order) {
    ui.displayOrder(order);
}

기본적으로는 페이로드인 Order객체만 handle의 인자로 받음

메시지의 추가적인 메타데이터가 필요하다면 ConsumerRecord나 Message 객체도 인자로 받을 수 있음.

 

@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, ConsumerRecord<String, Order> record) {
    log.info("Received from partition {} with timestamp {}",
            record.partition(), record.timestamp());

    ui.displayOrder(order);
}

예를 들어, 메시지의 파티션과 타임스탬프롤 로깅하기 위해 ConsumerRecord를 인자로 받을 수 있음

메시지 페이로드는 ConsumerRecord.value()나 Message.getPayload()를 사용해서 받을 수도 있다.

 

 

요약

 - 애플리케이션 간 비동기 메시지 큐를 이용한 통신방식은 간접 계층을 제공하므로 애플리케이션 간의 결합도는 낮츠고 확장성은 높임 

반응형

댓글