个人博客
主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键是有规则的。简单地介绍下规则:
*
(星号) 用来表示一个单词 (必须出现的)
#
(井号) 用来表示任意数量(零个或多个)单词
通配的路由键是跟队列进行绑定的,举个小例子:
队列Q1路由键为*.TT.*
,队列Q2路由键为TT.#
;
如果一条消息携带的路由值为A.TT.B
,那么队列Q1将会收到;
如果一条消息携带的路由值为TT.AA.BB
,那么队列Q2将会收到;
当一个队列的绑定键为#
(井号)的时候,这个队列将会无视消息的路由键,接收所有的消息。
当*
(星号)和#
(井号)这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。所以主题交换机也就实现了扇形交换机和直连交换机的功能。
1、Maven依赖
1 2 3 4 5 6 7 8 9 10 11
| <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.4.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
|
2、配置文件
1 2 3 4 5 6 7 8 9
| server: port: 30200
spring: rabbitmq: host: 148.70.153.63 port: 5672 username: libai password: password
|
3、生产者
3.1、配置文件
声明队列和交换机。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @Configuration public class TopicRabbitConfig { @Bean public Queue firstQueue() { return new Queue("topicQueue1", true, false, false); }
@Bean public Queue secondQueue() { return new Queue("topicQueue2", true, false, false); }
@Bean public TopicExchange exchange() { return new TopicExchange("topicExchange", true, false); }
@Bean public Binding bindingExchangeMessage() { return BindingBuilder.bind(firstQueue()).to(exchange()).with("topic.man"); }
@Bean public Binding bindingExchangeMessage2() { return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#"); } }
|
3.2、发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @RestController public class SendMessageController { @Autowired private RabbitTemplate rabbitTemplate;
@PostMapping("/sendTopicMessage1") public String sendTopicMessage1() { Map<String, Object> manMap = new HashMap<>(); manMap.put("messageId", String.valueOf(UUID.randomUUID())); manMap.put("messageData", "man"); manMap.put("createTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap); return "ok"; }
@PostMapping("/sendTopicMessage2") public String sendTopicMessage2() { Map<String, Object> womanMap = new HashMap<>(); womanMap.put("messageId", String.valueOf(UUID.randomUUID())); womanMap.put("messageData", "woman"); womanMap.put("createTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); rabbitTemplate.convertAndSend("topicExchange", "topic.woman", womanMap); return "ok"; } }
|
根据主题交换机的特性,携带路由键topic.man
的消息将会发送到topicQueue1
和topicQueue2
这2个队列中,携带路由键topic.woman
的消息只会发送到topicQueue2
队列中。
启动服务,用postman
分别调用发送消息的2个接口。
3.3、查看RabbitMQ
的后台管理界面
可以看到已经有3条消息推送到队列中,等待被消费。
3.4、查看队列
topicQueue1
上有1条消息等待被消费,topicQueue2
上则有2条消息等待被消费。
4、消费者
通过注解@RabbitListener
指定要消费的队列。
1 2 3 4 5 6 7 8 9
| @Component @RabbitListener(queues = "topicQueue1") @Slf4j public class TopicReceiver1 { @RabbitHandler public void process(Map testMessage) { log.info("TopicReceiver1消费者收到消息:{}", JSONUtil.toJsonPrettyStr(testMessage)); } }
|
1 2 3 4 5 6 7 8 9
| @Component @RabbitListener(queues = "topicQueue2") @Slf4j public class TopicReceiver2 { @RabbitHandler public void process(Map testMessage) { log.info("TopicReceiver2消费者收到消息:{}", JSONUtil.toJsonPrettyStr(testMessage)); } }
|
重新启动服务,可以看到控制台打印输出,说明该条消息已经被消费者消费成功了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| 2020-09-13 22:41:45,893 [INFO] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#4-1] [net.zhaoxiaobin.rabbitmq.consumer.topic.TopicReceiver1:25] [] TopicReceiver1消费者收到消息:{ "createTime": "2020-09-13 22:35:07", "messageId": "6a5a2402-8dca-42c5-8778-f60c082e8a53", "messageData": "man" } 2020-09-13 22:41:46,056 [INFO] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#5-1] [net.zhaoxiaobin.rabbitmq.consumer.topic.TopicReceiver2:25] [] TopicReceiver2消费者收到消息:{ "createTime": "2020-09-13 22:35:07", "messageId": "6a5a2402-8dca-42c5-8778-f60c082e8a53", "messageData": "man" } 2020-09-13 22:41:46,057 [INFO] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#5-1] [net.zhaoxiaobin.rabbitmq.consumer.topic.TopicReceiver2:25] [] TopicReceiver2消费者收到消息:{ "createTime": "2020-09-13 22:35:09", "messageId": "c179f30a-9746-4d9e-9617-c667d26d74be", "messageData": "woman" }
|
参考链接
代码地址