个人博客
进入死信队列的场景:
- 消息被拒绝
(basic.reject / basic.nack)
并且 requeue = false
; - 消息
TTL
过期(在RabbitMQ
3.5.8版本之前,实现消息的延迟发送就是依靠消息过期进入死信队列然后进行消费来完成的); - 队列达到最大长度;
1、使用原生API实现死信队列DLX的应用
1.1、生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| @Slf4j public class ProducerDLX { public static final String HOST = "148.70.153.63"; public static final String USER_NAME = "libai";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(HOST); connectionFactory.setPort(AMQP.PROTOCOL.PORT); connectionFactory.setUsername(USER_NAME); connectionFactory.setPassword(System.getProperty("password")); connectionFactory.setVirtualHost(ConnectionFactory.DEFAULT_VHOST);
@Cleanup Connection connection = connectionFactory.newConnection(); @Cleanup Channel channel = connection.createChannel();
String dlxExchangeName = "DLXExchange", dlxQueueName = "DLXQueue", dlxRoutingKey = "DLX"; channel.exchangeDeclare(dlxExchangeName, BuiltinExchangeType.DIRECT, true); channel.queueDeclare(dlxQueueName, true, false, false, null); channel.queueBind(dlxQueueName, dlxExchangeName, dlxRoutingKey);
String exchangeName = "amq.direct", queueName = "TestDLXQueue", routingKey = "DLX"; Map<String, Object> argMap = new HashMap<>(); argMap.put("x-message-ttl", 30 * 1000); argMap.put("x-dead-letter-exchange", dlxExchangeName); channel.queueDeclare(queueName, true, false, false, argMap); channel.queueBind(queueName, exchangeName, routingKey);
String msg = "测试死信队列"; log.info("now:[{}],发送消息:[{}]", DateUtil.now(), msg); channel.basicPublish(exchangeName, routingKey, null, msg.getBytes()); } }
|
- 创建死信队列
DLXQueue
并和指定交换机DLXExchange
进行绑定(其实也是普通的队列、普通的交换机)。 - 创建另外一个正常的消息队列
TestDLXQueue
,设置队列的TTL
过期时间,同时通过x-dead-letter-exchange
属性指定死信交换机DLXExchange
。
1.2、测试消息过期进入死信队列
运行main
函数,推送消息给TestDLXQueue
队列。可以先看到消息先在TestDLXQueue
队列中。
等到30秒后没有被消费,则会把消息推送到DLXQueue
死信队列中。
1.3、死信队列的消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Slf4j public class ConsumerDLX { public static final String HOST = "148.70.153.63"; public static final String USER_NAME = "libai";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(HOST); connectionFactory.setPort(AMQP.PROTOCOL.PORT); connectionFactory.setUsername(USER_NAME); connectionFactory.setPassword(System.getProperty("password")); connectionFactory.setVirtualHost(ConnectionFactory.DEFAULT_VHOST);
Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel();
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { log.info("now:[{}],消费消息:[{}]", DateUtil.now(), new String(body)); } }; channel.basicConsume("DLXQueue", true, consumer); } }
|
1.4、运行测试
1 2
| now:[2020-09-28 20:00:10],发送消息:[测试死信队列] now:[2020-09-28 20:00:40],消费消息:[测试死信队列]
|
主要过程:
生产者 —> 原交换机amq.direct
—> 原队列TestDLXQueue
(超过 TTL 之后) —> 死信交换机DLXExchange
—> 死信队列DLXQueue
—> 最终消费者。
2、Springboot整合RabbitMQ实现死信队列DLX的应用
2.1、配置死信队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Bean public Queue DLXQueue() { return new Queue("DLX_QUEUE", true, false, false); }
@Bean public DirectExchange DLXExchange() { return new DirectExchange("DLX_EXCHANGE", true, false); }
@Bean public Binding bindingDLX() { return BindingBuilder.bind(DLXQueue()).to(DLXExchange()).with("DLX"); }
|
创建死信队列DLX_QUEUE
并和指定交换机DLX_EXCHANGE
进行绑定(其实也是普通的队列、普通的交换机)。
2.2、配置消息队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Bean public Queue testDLXQueue() { Map<String, Object> map = new HashMap<>(); map.put("x-message-ttl", 30000); map.put("x-dead-letter-exchange", "DLX_EXCHANGE"); return new Queue("TEST_DLX_QUEUE", true, false, false, map); }
@Bean public DirectExchange testDLXExchange() { return new DirectExchange("TEST_DLX_EXCHANGE", true, false); }
@Bean public Binding bindingTestDLX() { return BindingBuilder.bind(testDLXQueue()).to(testDLXExchange()).with("DLX"); }
|
创建另外一个正常的消息队列TEST_DLX_QUEUE
,设置队列的TTL
过期时间,同时通过x-dead-letter-exchange
属性指定死信队列对应的交换机。
2.3、生产者
1 2 3 4 5 6 7 8 9 10 11
| @RestController public class DLXController { @Autowired private RabbitTemplate rabbitTemplate;
@PostMapping("/testDLX") public String testDLX() { rabbitTemplate.convertAndSend("TEST_DLX_EXCHANGE", "DLX", "测试死信队列"); return "ok"; } }
|
等到30秒后没有被消费,则会把消息推送到DLX_QUEUE
死信队列中。
3、死信队列实现消息延迟发送的缺点
- 如果统一用队列来设置消息的
TTL
,当梯度非常多的情况下,比如 1 分钟,2 分钟,5 分钟,10 分钟,20 分钟,30 分钟……需要创建很多队列来路由消息。 - 如果单独设置消息的
TTL
,则可能会造成队列中的消息阻塞,即前一条消息没有出队(没有被消费),后面的消息无法投递。比如第一条消息过期TTL
是30min,第二条消息TTL
是10min。10分钟后,即使第二条消息应该投递了,但是由于第一条消息还未出队,所以无法投递。 - 可能存在一定时间误差。
所以在RabbitMQ
3.5.8版本之后,可以利用官方的rabbitmq-delayed-message-exchange
插件来实现消息的延迟发送,可以避免上面所说的问题。
RabbitMQ实现消息延迟推送
参考链接
代码地址