个人博客
扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。
1、Maven依赖
1 2 3 4 5 6 7 8 9 10 11
| <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.4.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
|
2、配置文件
1 2 3 4 5 6 7 8 9
| server: port: 30200
spring: rabbitmq: host: 148.70.153.63 port: 5672 username: libai password: password
|
3、生产者
3.1、配置文件
声明队列和交换机。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @Configuration public class FanoutRabbitConfig { @Bean public Queue queueA() { return new Queue("fanoutA", true, false, false); }
@Bean public Queue queueB() { return new Queue("fanoutB", true, false, false); }
@Bean public Queue queueC() { return new Queue("fanoutC", true, false, false); }
@Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange", true, false); }
@Bean public Binding bindingExchangeA() { return BindingBuilder.bind(queueA()).to(fanoutExchange()); }
@Bean public Binding bindingExchangeB() { return BindingBuilder.bind(queueB()).to(fanoutExchange()); }
@Bean public Binding bindingExchangeC() { return BindingBuilder.bind(queueC()).to(fanoutExchange()); } }
|
创建三个队列 :fanoutA
、fanoutB
、fanoutC
,将三个队列都绑定在交换机fanoutExchange
上。
因为是扇型交换机, 路由键无需配置,配置也不起作用。
3.2、发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @RestController public class SendMessageController { @Autowired private RabbitTemplate rabbitTemplate;
@PostMapping("/sendFanoutMessage") public String sendFanoutMessage() { Map<String, Object> map = new HashMap<>(); map.put("messageId", String.valueOf(UUID.randomUUID())); map.put("messageData", "testFanoutMessage"); map.put("createTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); rabbitTemplate.convertAndSend("fanoutExchange", null, map); return "ok"; } }
|
发送到指定的交换机上,因为是扇形交换机,所以会把消息广播到所有和该交换机绑定的队列上。
启动服务,用postman
调用发送消息接口。
3.3、查看RabbitMQ
的后台管理界面
可以看到已经有3条消息推送到队列中,等待被消费。
3.4、查看交换机
可以看到和3个队列绑定,每发到该交换机上的一条消息都会被广播到这3个队列上。
4、消费者
通过注解@RabbitListener
指定要消费的队列。
1 2 3 4 5 6 7 8 9
| @Component @RabbitListener(queues = "fanoutA") @Slf4j public class FanoutReceiverA { @RabbitHandler public void process(Map testMessage) { log.info("FanoutReceiverA消费者收到消息:{}", JSONUtil.toJsonPrettyStr(testMessage)); } }
|
1 2 3 4 5 6 7 8 9
| @Component @RabbitListener(queues = "fanoutB") @Slf4j public class FanoutReceiverB { @RabbitHandler public void process(Map testMessage) { log.info("FanoutReceiverB消费者收到消息:{}", JSONUtil.toJsonPrettyStr(testMessage)); } }
|
1 2 3 4 5 6 7 8 9
| @Component @RabbitListener(queues = "fanoutC") @Slf4j public class FanoutReceiverC { @RabbitHandler public void process(Map testMessage) { log.info("FanoutReceiverC消费者收到消息:{}", JSONUtil.toJsonPrettyStr(testMessage)); } }
|
重新启动服务,可以看到控制台打印输出,说明该条消息已经被3个消费者消费成功了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| 2020-09-13 21:47:56,509 [INFO] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] [net.zhaoxiaobin.rabbitmq.consumer.fanout.FanoutReceiverA:25] [] FanoutReceiverA消费者收到消息:{ "createTime": "2020-09-13 21:38:57", "messageId": "765c1218-6c4c-4a72-af2c-6f386ff8e0d2", "messageData": "testFanoutMessage" } 2020-09-13 21:47:56,678 [INFO] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#2-1] [net.zhaoxiaobin.rabbitmq.consumer.fanout.FanoutReceiverB:25] [] FanoutReceiverB消费者收到消息:{ "createTime": "2020-09-13 21:38:57", "messageId": "765c1218-6c4c-4a72-af2c-6f386ff8e0d2", "messageData": "testFanoutMessage" } 2020-09-13 21:47:56,885 [INFO] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#3-1] [net.zhaoxiaobin.rabbitmq.consumer.fanout.FanoutReceiverC:25] [] FanoutReceiverC消费者收到消息:{ "createTime": "2020-09-13 21:38:57", "messageId": "765c1218-6c4c-4a72-af2c-6f386ff8e0d2", "messageData": "testFanoutMessage" }
|
参考链接
代码地址