个人博客


主要有2种方式:

  1. 指定一条消息的过期时间。
  2. 给队列设置消息过期时间,队列中的所有消息都有同样的过期时间。

1、指定消息的过期时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class ProducerTTL {
public static final String HOST = "148.70.153.63";
public static final String USER_NAME = "libai";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST);
connectionFactory.setPort(AMQP.PROTOCOL.PORT);
connectionFactory.setUsername(USER_NAME);
connectionFactory.setPassword(System.getProperty("password"));
connectionFactory.setVirtualHost(ConnectionFactory.DEFAULT_VHOST);

// 创建连接和通道
@Cleanup Connection connection = connectionFactory.newConnection();
@Cleanup Channel channel = connection.createChannel();

String exchangeName = "amq.direct", queueName = "TTLQueue", routingKey = "TTL";
// 声明交换机(如果不存在才创建),交换机名称、类型、交换机是否持久化
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true);
// 声明队列(如果不存在才创建),队列名称、队列是否持久化、是否排他(连接可见性)、是否自动删除(所有消费者断开连接后删除队列)、参数
channel.queueDeclare(queueName, true, false, false, null);
// 将队列和交换机绑定并指定路由键
channel.queueBind(queueName, exchangeName, routingKey);

// 发送消息
String msg = "测试消息自动过期";
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(1) // 1-表示消息不做持久化,2-表示消息会持久化到磁盘(对性能会有些影响)
.expiration("30000") // 设置消息过期时间,单位:毫秒
.build();
// 把消息发送到指定的交换机,交换机根据路由键推送到绑定的队列中;交换机名称、路由键、属性、消息字节
channel.basicPublish(exchangeName, routingKey, properties, msg.getBytes());
}
}

这里构建一个AMQP.BasicProperties对象,设置过期时间,推送消息时传入该对象就可以了。

消息进入队列后,等待30秒后,消息自动过期就失效了。

-w878

注意
RabbitMQ只会对队列头部的消息进行过期淘汰。如果单独给消息设置TTL,先入队列的消息过期时间如果设置比较长,后入队列的设置时间比较短。会造成消息不会及时地过期淘汰,导致消息的堆积。

2、给队列中的所有消息设置过期时间

需要把前面的TTLQueue队列删除,否则会报错。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class ProducerTTL {
public static final String HOST = "148.70.153.63";
public static final String USER_NAME = "libai";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST);
connectionFactory.setPort(AMQP.PROTOCOL.PORT);
connectionFactory.setUsername(USER_NAME);
connectionFactory.setPassword(System.getProperty("password"));
connectionFactory.setVirtualHost(ConnectionFactory.DEFAULT_VHOST);

// 创建连接和通道
@Cleanup Connection connection = connectionFactory.newConnection();
@Cleanup Channel channel = connection.createChannel();

String exchangeName = "amq.direct", queueName = "TTLQueue", routingKey = "TTL";
// 声明交换机(如果不存在才创建),交换机名称、类型、交换机是否持久化
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true);
Map<String, Object> argMap = new HashMap<>();
argMap.put("x-message-ttl", 30 * 1000); // 设置队列里消息的ttl的时间30s
// 声明队列(如果不存在才创建),队列名称、队列是否持久化、是否排他(连接可见性)、是否自动删除(所有消费者断开连接后删除队列)、参数
channel.queueDeclare(queueName, true, false, false, argMap);
// 将队列和交换机绑定并指定路由键
channel.queueBind(queueName, exchangeName, routingKey);

// 发送消息
String msg = "测试消息自动过期";
// 把消息发送到指定的交换机,交换机根据路由键推送到绑定的队列中;交换机名称、路由键、属性、消息字节
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
}
}

声明1个x-message-ttl的属性,同时设置过期时间。在创建队列时,传入该参数。
凡是推送到该队列中的所有消息,都会有一个30秒后过期的属性。

可以看到创建的队列有TTL的特性,表示该队列中的消息会自动过期。

-w989

代码地址