个人博客
使用RabbitMQ
原生的API来操作消息的生产和消费。
首先先介绍一个简单的一个消息推送到接收的流程,提供一个简单的图:
黄色的圈圈就是我们的消息推送服务,将消息推送到 中间方框里面也就是RabbitMQ
的服务器,然后经过服务器里面的交换机、队列等各种关系将数据处理入列后,最终右边的蓝色圈圈消费者获取对应监听的消息。
常用的交换机有以下三种,因为消费者是从队列获取信息的,队列是绑定交换机的(一般),所以对应的消息推送/接收模式也会有以下几种:
Direct Exchange
直连型交换机,根据消息携带的路由值将消息投递给对应队列。
大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键routing key
。
然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值X去寻找绑定值也是X的队列。
Fanout Exchange
扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。
Topic Exchange
主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键是有规则的。
简单地介绍下规则:
*
(星号) 用来表示一个单词 (必须出现的)
#
(井号) 用来表示任意数量(零个或多个)单词
通配的路由键是跟队列进行绑定的,举个小例子:
队列Q1路由键为*.TT.*
,队列Q2路由键为TT.#
;
如果一条消息携带的路由值为A.TT.B
,那么队列Q1将会收到;
如果一条消息携带的路由值为TT.AA.BB
,那么队列Q2将会收到;
当一个队列的绑定键为#
(井号)的时候,这个队列将会无视消息的路由键,接收所有的消息。
当*
(星号)和#
(井号)这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
所以主题交换机也就实现了扇形交换机和直连交换机的功能。
常用的就是以上3种交换机,另外还有Header Exchange
头交换机,Default Exchange
默认交换机,Dead Letter Exchange
死信交换机。
1、Maven依赖
1 2 3 4 5 6 7 8 9 10 11
| <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.4.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
|
2、生产消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| public class Producer { public static final String HOST = "148.70.153.63"; public static final String USER_NAME = "libai";
public static void main(String[] args) throws IOException, TimeoutException { Producer producer = new Producer(); ConnectionFactory connectionFactory = producer.getConnectionFactory();
Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel();
String exchangeName = "amq.direct", queueName = "apiQueue1", routingKey = "RabbitMQ"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey);
String msg = "Hello RabbitMQ!"; channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
channel.close(); connection.close(); }
public ConnectionFactory getConnectionFactory() { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(HOST); connectionFactory.setPort(AMQP.PROTOCOL.PORT); connectionFactory.setUsername(USER_NAME); connectionFactory.setPassword(System.getProperty("password")); connectionFactory.setVirtualHost(ConnectionFactory.DEFAULT_VHOST); return connectionFactory; } }
|
- 创建连接
Connection
和通道Channel
。 - 声明交换机
Exchang
,如果不存在就会创建,可以指定交换机类型以及是否持久化。 - 声明队列
Queue
,如果不存在也会创建。 - 将队列
Queue
和交换机Exchang
绑定,并指定绑定的路由键RoutingKey
。 - 发送消息到指定的交换机
Exchang
,交换机会根据路由键RoutingKey
找到绑定的队列Queue
,并把消息发送到队列中。 - 关闭连接
Connection
和通道Channel
。
3、消费消息
3.1、消费者主动拉取模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Slf4j public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { Producer producer = new Producer(); ConnectionFactory connectionFactory = producer.getConnectionFactory();
Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel();
GetResponse response = channel.basicGet("apiQueue1", true); String msg = Optional.ofNullable(response).map(GetResponse::getBody).map(String::new).orElse(null); log.info("消费消息:[{}]", msg); channel.close(); connection.close(); } }
|
这种模式在创建连接和通道后,主动从server中拉取消息,效率、性能比较低下。
3.2、server推送模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| @Slf4j public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { Producer producer = new Producer(); ConnectionFactory connectionFactory = producer.getConnectionFactory();
Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel();
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long now = System.currentTimeMillis(); if (now % 3 == 0) { log.info("手动确认消费消息:[{}]", new String(body)); channel.basicAck(envelope.getDeliveryTag(), false); } else if (now % 3 == 1) { log.info("basicNack:[手动拒绝消息,重回队列]"); channel.basicNack(envelope.getDeliveryTag(), false, true); } else if (now % 3 == 2) { log.info("basicReject:[手动拒绝消息,重回队列]"); channel.basicReject(envelope.getDeliveryTag(), true); } } }; channel.basicConsume("apiQueue1", false, consumer); } }
|
这种模式在创建连接和通道后就进入等待消费的状态,等待队列推送消息然后进行消费。
手动ACK模式,在成功消费消息后需要手动ACK确认,也可以拒绝当前消息并指定是否重回队列中被再次消费。
参考链接
代码地址