个人博客
消息接收的确认机制主要有三种模式:
自动确认AcknowledgeMode.NONE
RabbitMQ
成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递。
所以这种情况如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
一般这种情况我们都是使用try catch
捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。
根据情况确认AcknowledgeMode.AUTO
这也是SpringBoot
集成RabbitMQ
默认的消息确认情况,如果消费消息时有异常抛出,则会拒绝消息,反之如果没有捕获到异常则确认本次消费成功。
手动确认AcknowledgeMode.MANUAL
这个比较关键,也是我们配置接收消息确认机制时,多数选择的模式。
消费者收到消息后,手动调用basicAck/basicNack/basicReject
后,RabbitMQ
收到这些消息后,才认为本次投递成功。
1、创建手动确认消息的队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Configuration public class DirectRabbitConfig { @Bean public DirectExchange directExchange() { return new DirectExchange("directExchange", true, false); }
@Bean public Queue manualAckQueue() { return new Queue("manualAckQueue", true, false, false); }
@Bean public Binding bindingDirectForManualAck() { return BindingBuilder.bind(manualAckQueue()).to(directExchange()).with("manualAck"); } }
|
2、手动确认消息的监听实现
2.1、通过配置实现
1 2 3 4 5 6 7 8 9 10 11 12
| spring: rabbitmq: host: 148.70.153.63 port: 5672 username: libai password: password listener: simple: acknowledge-mode: manual default-requeue-rejected: true
|
2.2、配置类实现(更加灵活)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Configuration @Slf4j public class MessageManualAckListenerConfig { @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); container.setQueueNames("manualAckQueue"); container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> { log.info("body:\n{}", JSONUtil.toJsonPrettyStr(new String(message.getBody()))); log.info("prop:\n{}", JSONUtil.toJsonPrettyStr(message.getMessageProperties()));
long deliveryTag = message.getMessageProperties().getDeliveryTag(); channel.basicAck(deliveryTag, false); }); return container; } }
|
body:接收的消息内容。
messageProperties:消息的相关属性。
3、发送消息测试手动确认
3.1、调用接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @RestController public class MessageManualAckController { @Autowired private RabbitTemplate rabbitTemplate;
@PostMapping("/manualAck") public String manualAck() { Map<String, Object> map = new HashMap<>(); map.put("messageId", String.valueOf(UUID.randomUUID())); map.put("messageData", "manualAck"); map.put("createTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); rabbitTemplate.convertAndSend("directExchange", "manualAck", JSONUtil.toJsonStr(map)); return "ok"; } }
|
3.2、查看控制台打印输出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| 2020-09-17 22:35:28,635 [INFO] [simpleMessageListenerContainer-1] [net.zhaoxiaobin.rabbitmq.manual.MessageManualAckListenerConfig:32] [] body: { "createTime": "2020-09-17 22:35:28", "messageId": "25398d1a-474e-48cd-a3df-460b780e9d97", "messageData": "manualAck" } 2020-09-17 22:35:28,636 [INFO] [simpleMessageListenerContainer-1] [net.zhaoxiaobin.rabbitmq.manual.MessageManualAckListenerConfig:33] [] prop: { "headers": { "spring_listener_return_correlation": "a6622c5e-22f0-4f39-bea5-4360ef8de66b" }, "finalRetryForMessageWithNoId": false, "contentLengthSet": false, "deliveryTag": 3, "receivedExchange": "directExchange", "priority": 0, "receivedRoutingKey": "manualAck", "redelivered": false, "consumerTag": "amq.ctag-cqCpyMhe9Ak2vuv_RifFlQ", "receivedDeliveryMode": "PERSISTENT", "publishSequenceNumber": 0, "contentEncoding": "UTF-8", "contentLength": 0, "contentType": "text/plain", "consumerQueue": "manualAckQueue", "deliveryTagSet": true }
|
3.3、消费消息时的状态变化
通过打断点方式查看当消息未被确认时在RabbitMQ server
中的状态。
4、确认/拒绝消息
4.1、basicAck
确认消息。
第2个参数如果设为true
,则表示批量确认当前通道中所有deliveryTag
小于当前消息的所有消息。
4.2、basicNack
拒绝消息。
第2个参数如果设为true
,则表示批量拒绝当前通道中所有deliveryTag
小于当前消息的所有消息。
第3个参数如果设为true
,则表示当前消息再次回到队列中等待被再次消费。
4.3、basicReject
拒绝消息。与basicNack
作用类似,只不过一次只能拒绝单条消息。
对于拒绝消息并且重回队列使用时需要谨慎,避免使用不当会导致一些每次都被你重入列的消息一直消费-入列-消费-入列这样循环,会导致消息积压
参考链接
代码地址