个人博客


消息接收的确认机制主要有三种模式:

  1. 自动确认AcknowledgeMode.NONE
    RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递。
    所以这种情况如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
    一般这种情况我们都是使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。

  2. 根据情况确认AcknowledgeMode.AUTO
    这也是SpringBoot集成RabbitMQ默认的消息确认情况,如果消费消息时有异常抛出,则会拒绝消息,反之如果没有捕获到异常则确认本次消费成功。

  3. 手动确认AcknowledgeMode.MANUAL
    这个比较关键,也是我们配置接收消息确认机制时,多数选择的模式。
    消费者收到消息后,手动调用basicAck/basicNack/basicReject后,RabbitMQ收到这些消息后,才认为本次投递成功。

1、创建手动确认消息的队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Configuration
public class DirectRabbitConfig {
//Direct交换机 起名:directExchange
@Bean
public DirectExchange directExchange() {
return new DirectExchange("directExchange", true, false);
}

//需要手动确认消息的队列
@Bean
public Queue manualAckQueue() {
return new Queue("manualAckQueue", true, false, false);
}

//手动确认消息的队列和直连交换机绑定
@Bean
public Binding bindingDirectForManualAck() {
return BindingBuilder.bind(manualAckQueue()).to(directExchange()).with("manualAck");
}
}

2、手动确认消息的监听实现

2.1、通过配置实现

1
2
3
4
5
6
7
8
9
10
11
12
spring:
rabbitmq:
host: 148.70.153.63
port: 5672
username: libai
password: password
listener:
simple:
# 手动确认
acknowledge-mode: manual
# 拒绝消息是否重回队列
default-requeue-rejected: true

2.2、配置类实现(更加灵活)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Configuration
@Slf4j
public class MessageManualAckListenerConfig {
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
// RabbitMQ默认是自动确认,这里改为手动确认消息
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 设置需要手动确认消息的队列,可以同时设置多个,前提是队列需要提前创建好
container.setQueueNames("manualAckQueue");
// 设置监听消息的方法,匿名内部类方式
container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
// 开始消费消息
log.info("body:\n{}", JSONUtil.toJsonPrettyStr(new String(message.getBody())));
log.info("prop:\n{}", JSONUtil.toJsonPrettyStr(message.getMessageProperties()));

// 手动确认
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false); // 肯定确认
});
return container;
}
}

body:接收的消息内容。
messageProperties:消息的相关属性。

3、发送消息测试手动确认

3.1、调用接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@RestController
public class MessageManualAckController {
@Autowired
private RabbitTemplate rabbitTemplate;

@PostMapping("/manualAck")
public String manualAck() {
Map<String, Object> map = new HashMap<>();
map.put("messageId", String.valueOf(UUID.randomUUID()));
map.put("messageData", "manualAck");
map.put("createTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
rabbitTemplate.convertAndSend("directExchange", "manualAck", JSONUtil.toJsonStr(map));
return "ok";
}
}

3.2、查看控制台打印输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2020-09-17 22:35:28,635 [INFO] [simpleMessageListenerContainer-1] [net.zhaoxiaobin.rabbitmq.manual.MessageManualAckListenerConfig:32] [] body:
{
"createTime": "2020-09-17 22:35:28",
"messageId": "25398d1a-474e-48cd-a3df-460b780e9d97",
"messageData": "manualAck"
}
2020-09-17 22:35:28,636 [INFO] [simpleMessageListenerContainer-1] [net.zhaoxiaobin.rabbitmq.manual.MessageManualAckListenerConfig:33] [] prop:
{
"headers": {
"spring_listener_return_correlation": "a6622c5e-22f0-4f39-bea5-4360ef8de66b"
},
"finalRetryForMessageWithNoId": false,
"contentLengthSet": false,
"deliveryTag": 3,
"receivedExchange": "directExchange",
"priority": 0,
"receivedRoutingKey": "manualAck",
"redelivered": false,
"consumerTag": "amq.ctag-cqCpyMhe9Ak2vuv_RifFlQ",
"receivedDeliveryMode": "PERSISTENT",
"publishSequenceNumber": 0,
"contentEncoding": "UTF-8",
"contentLength": 0,
"contentType": "text/plain",
"consumerQueue": "manualAckQueue",
"deliveryTagSet": true
}

3.3、消费消息时的状态变化

通过打断点方式查看当消息未被确认时在RabbitMQ server中的状态。

-w628

4、确认/拒绝消息

4.1、basicAck

确认消息。
第2个参数如果设为true,则表示批量确认当前通道中所有deliveryTag小于当前消息的所有消息。

4.2、basicNack

拒绝消息。
第2个参数如果设为true,则表示批量拒绝当前通道中所有deliveryTag小于当前消息的所有消息。
第3个参数如果设为true,则表示当前消息再次回到队列中等待被再次消费。

4.3、basicReject

拒绝消息。与basicNack作用类似,只不过一次只能拒绝单条消息。

对于拒绝消息并且重回队列使用时需要谨慎,避免使用不当会导致一些每次都被你重入列的消息一直消费-入列-消费-入列这样循环,会导致消息积压

参考链接

代码地址