个人博客


1、使用场景

  • 淘宝七天自动确认收货。在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持续七天,就是使用了消息中间件的延迟推送功能。
  • 12306 购票支付确认页面。我们在选好票点击确定跳转的页面中往往都会有倒计时,代表着 30 分钟内订单不确认的话将会自动取消订单。其实在下订单那一刻开始购票业务系统就会发送一个延时消息给订单系统,延时30分钟,告诉订单系统订单未完成,如果我们在30分钟内完成了订单,则可以通过逻辑代码判断来忽略掉收到的消息。

在上面两种场景中,如果我们使用下面3种传统解决方案无疑大大降低了系统的整体性能和吞吐量:

  • 使用 redis 给订单设置过期时间,最后通过判断 redis 中是否还有该订单来决定订单是否已经完成。这种解决方案相较于消息的延迟推送性能较低,因为我们知道 redis 都是存储于内存中,我们遇到恶意下单或者刷单的将会给内存带来巨大压力。
  • 使用传统的数据库轮询来判断数据库表中订单的状态,这无疑增加了IO次数,性能极低。
  • 使用 jvm 原生的 DelayQueue ,也是大量占用内存,而且没有持久化策略,系统宕机或者重启都会丢失订单信息。

2、实现方式

RabbitMQ 3.6.x 之前我们一般采用死信队列+TTL过期时间来实现延迟队列。这里不做介绍。

RabbitMQ 3.6.x 开始,RabbitMQ 官方提供了延迟队列的插件rabbitmq-delayed-message-exchange

3、插件安装

  1. rabbitmq-delayed-message-exchange插件官方下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

    选择对应版本下载(要和安装的RabbitMQ server版本匹配)。

  2. 找到RabbitMQ的安装路径,将下载的插件放到plugins目录中。比如:

    1
    2
    3
    -rw-r--r-- 1 root root   43377 9月  20 22:54 rabbitmq_delayed_message_exchange-3.8.0.ez
    [root@libai plugins]# pwd
    /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.14/plugins
  3. 启用插件
    使用rabbitmq-plugins enable rabbitmq_delayed_message_exchange命令启用插件。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    [root@libai plugins]# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    Enabling plugins on node rabbit@libai:
    rabbitmq_delayed_message_exchange
    The following plugins have been configured:
    rabbitmq_delayed_message_exchange
    rabbitmq_management
    rabbitmq_management_agent
    rabbitmq_web_dispatch
    Applying plugin configuration to rabbit@libai...
    The following plugins have been enabled:
    rabbitmq_delayed_message_exchange
    started 1 plugins.
  4. 查看管理界面
    -w760

查看交换机Type是否有x-delayed-message下拉选项,如果有则表示插件安装已经生效了。

4、应用

配置和依赖这里就不贴出了,可以参考以往Springboot整合RabbitMQ的文章。

4.1、队列和交换机绑定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Configuration
public class DelayedMessageRabbitConfig {
@Bean
public Queue delayQueue() {
return new Queue("delayQueue", true, false, false);
}

@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange("delayedExchange", "x-delayed-message", true, false, args);
}

@Bean
public Binding bindingDelay() {
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("delayRouting").noargs();
}
}

交换机类型为CustomExchange自定义类型,这里指定为x-delayed-message

4.2、生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@RestController
@Slf4j
public class DelayMessageController {
@Autowired
private RabbitTemplate rabbitTemplate;

@PostMapping("/sendDelayedMessage")
public String sendDelayedMessage() {
log.info(DateUtil.now());
rabbitTemplate.convertAndSend("delayedExchange", "delayRouting", "订单取消", message -> {
message.getMessageProperties().setDelay(5000);
return message;
});
return "ok";
}
}

这里指定延迟5秒推送消息。

4.3、消费者

1
2
3
4
5
6
7
8
9
10
@Component
@RabbitListener(queues = "delayQueue")
@Slf4j
public class DelayReceiver {
@RabbitHandler
public void process(String delayMessage) {
log.info(DateUtil.now());
log.info("延迟收到消息:{}", delayMessage);
}
}

4.4、效果

1
2
3
4
2020-09-21 21:35:44
------------------
2020-09-21 21:35:49
延迟收到消息:订单取消

如果开启了消息确认机制,比如确认消息是否发到了交换机(publisher-confirms为true),则可能出现312、NO_ROUTE的提示,忽略即可。

另外这里的消息延迟主要发生在交换机延迟推送消息到队列中,而非队列延迟推送到消费者。

代码地址