基于ActiveMQ的请求-应答模式

时间:2022-07-24
本文章向大家介绍基于ActiveMQ的请求-应答模式,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

基于ActiveMQ的请求-应答模式

一. 使用场景

基于ActiveMQ的请求-应答模式,相当于通过消息队列,请求端注册了一个异步回调,在发送消息时指定回调消息的目的地和关联的id,这样应答端在收到请求消息时,可以在处理后,将处理结果的应答消息发送到回调的目的地中。

二. 代码实例

首先是请求和响应的消息定义:

/**
 * @Auther: ZhangShenao
 * @Date: 2019/2/12 18:40
 * @Description:请求消息
 */
@Getter
@Setter
@ToString
public class RequestDto {
    //业务id
    private String uid;

    //业务数据
    private String payload;
}
/**
 * @Auther: ZhangShenao
 * @Date: 2019/2/12 18:33
 * @Description:响应消息
 */
@Getter
@Setter
@ToString
public class ResponseDto {
    //业务id
    private String uid;

    //处理结果
    private boolean success;
}

请求端:

/**
 * @Auther: ZhangShenao
 * @Date: 2019/2/12 18:09
 * @Description:Request-Response模式的请求端
 */
@Service
public class Producer {
    //保存所有请求的业务id和响应结果
    private static final Map<String, Boolean> replies = new ConcurrentHashMap<>();

    @Autowired
    @Qualifier(Constants.JMS_QUEUE_TEMPLATE)
    private JmsTemplate queueTemplat;

    public void sendMessage(RequestDto dto) {
        queueTemplat.send(Constants.QueueNames.REQUEST_QUEUE, session -> {
            TextMessage message = session.createTextMessage(dto.getPayload());

            //设置消息关联id,将请求和应答消息关联起来
            message.setJMSCorrelationID(dto.getUid());

            //设置消息回复的目的地
            message.setJMSReplyTo(new ActiveMQQueue(Constants.QueueNames.RESPONSE_QUEUE));

            //记录发送的请求
            replies.putIfAbsent(dto.getUid(), false);
            return message;
        });
    }

    @JmsListener(destination = Constants.QueueNames.RESPONSE_QUEUE, containerFactory = Constants.QUEUE_LISTENER_CONTAINER_FACTORY)
    public void onReply(ResponseDto dto) {
        replies.put(dto.getUid(), dto.isSuccess());
        System.err.println("On Reply: " + dto);
    }
}

响应端:

/**
 * @Auther: ZhangShenao
 * @Date: 2019/2/12 18:42
 * @Description:Request-Response模式的响应端
 */
@Service
public class Consumer {
    @Autowired
    @Qualifier(Constants.JMS_QUEUE_TEMPLATE)
    private JmsTemplate queueTemplat;

    @JmsListener(destination = Constants.QueueNames.REQUEST_QUEUE, containerFactory = Constants.QUEUE_LISTENER_CONTAINER_FACTORY)
    public void onRequest(TextMessage message) throws JMSException {
        //获取消息回复目的地和关联id,向回复目的地发送回复消息
        Destination replyTo = message.getJMSReplyTo();
        ResponseDto response = new ResponseDto();
        response.setUid(message.getJMSCorrelationID());
        response.setSuccess(true);
        queueTemplat.convertAndSend(replyTo,response);
    }
}