个人博客
主要有2种方式:
- 指定一条消息的过期时间。
- 给队列设置消息过期时间,队列中的所有消息都有同样的过期时间。
1、指定消息的过期时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public class ProducerTTL { public static final String HOST = "148.70.153.63"; public static final String USER_NAME = "libai";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(HOST); connectionFactory.setPort(AMQP.PROTOCOL.PORT); connectionFactory.setUsername(USER_NAME); connectionFactory.setPassword(System.getProperty("password")); connectionFactory.setVirtualHost(ConnectionFactory.DEFAULT_VHOST);
@Cleanup Connection connection = connectionFactory.newConnection(); @Cleanup Channel channel = connection.createChannel();
String exchangeName = "amq.direct", queueName = "TTLQueue", routingKey = "TTL"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey);
String msg = "测试消息自动过期"; AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(1) .expiration("30000") .build(); channel.basicPublish(exchangeName, routingKey, properties, msg.getBytes()); } }
|
这里构建一个AMQP.BasicProperties
对象,设置过期时间,推送消息时传入该对象就可以了。
消息进入队列后,等待30秒后,消息自动过期就失效了。
注意:
RabbitMQ
只会对队列头部的消息进行过期淘汰。如果单独给消息设置TTL,先入队列的消息过期时间如果设置比较长,后入队列的设置时间比较短。会造成消息不会及时地过期淘汰,导致消息的堆积。
2、给队列中的所有消息设置过期时间
需要把前面的TTLQueue
队列删除,否则会报错。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class ProducerTTL { public static final String HOST = "148.70.153.63"; public static final String USER_NAME = "libai";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(HOST); connectionFactory.setPort(AMQP.PROTOCOL.PORT); connectionFactory.setUsername(USER_NAME); connectionFactory.setPassword(System.getProperty("password")); connectionFactory.setVirtualHost(ConnectionFactory.DEFAULT_VHOST);
@Cleanup Connection connection = connectionFactory.newConnection(); @Cleanup Channel channel = connection.createChannel();
String exchangeName = "amq.direct", queueName = "TTLQueue", routingKey = "TTL"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true); Map<String, Object> argMap = new HashMap<>(); argMap.put("x-message-ttl", 30 * 1000); channel.queueDeclare(queueName, true, false, false, argMap); channel.queueBind(queueName, exchangeName, routingKey);
String msg = "测试消息自动过期"; channel.basicPublish(exchangeName, routingKey, null, msg.getBytes()); } }
|
声明1个x-message-ttl
的属性,同时设置过期时间。在创建队列时,传入该参数。
凡是推送到该队列中的所有消息,都会有一个30秒后过期的属性。
可以看到创建的队列有TTL
的特性,表示该队列中的消息会自动过期。
代码地址