个人博客


扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。

1、Maven依赖

1
2
3
4
5
6
7
8
9
10
11
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>

2、配置文件

1
2
3
4
5
6
7
8
9
server:
port: 30200

spring:
rabbitmq:
host: 148.70.153.63
port: 5672
username: libai
password: password

3、生产者

3.1、配置文件

声明队列和交换机。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Configuration
public class FanoutRabbitConfig {
@Bean
public Queue queueA() {
return new Queue("fanoutA", true, false, false);
}

@Bean
public Queue queueB() {
return new Queue("fanoutB", true, false, false);
}

@Bean
public Queue queueC() {
return new Queue("fanoutC", true, false, false);
}

@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange", true, false);
}

@Bean
public Binding bindingExchangeA() {
return BindingBuilder.bind(queueA()).to(fanoutExchange());
}

@Bean
public Binding bindingExchangeB() {
return BindingBuilder.bind(queueB()).to(fanoutExchange());
}

@Bean
public Binding bindingExchangeC() {
return BindingBuilder.bind(queueC()).to(fanoutExchange());
}
}

创建三个队列 :fanoutAfanoutBfanoutC,将三个队列都绑定在交换机fanoutExchange上。

因为是扇型交换机, 路由键无需配置,配置也不起作用。

3.2、发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@RestController
public class SendMessageController {
@Autowired
private RabbitTemplate rabbitTemplate; // 使用RabbitTemplate,这提供了接收/发送等等方法

@PostMapping("/sendFanoutMessage")
public String sendFanoutMessage() {
Map<String, Object> map = new HashMap<>();
map.put("messageId", String.valueOf(UUID.randomUUID()));
map.put("messageData", "testFanoutMessage");
map.put("createTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
rabbitTemplate.convertAndSend("fanoutExchange", null, map);
return "ok";
}
}

发送到指定的交换机上,因为是扇形交换机,所以会把消息广播到所有和该交换机绑定的队列上。

启动服务,用postman调用发送消息接口。

3.3、查看RabbitMQ的后台管理界面

可以看到已经有3条消息推送到队列中,等待被消费。
-w817

3.4、查看交换机

可以看到和3个队列绑定,每发到该交换机上的一条消息都会被广播到这3个队列上。
-w652

4、消费者

通过注解@RabbitListener指定要消费的队列。

1
2
3
4
5
6
7
8
9
@Component
@RabbitListener(queues = "fanoutA")
@Slf4j
public class FanoutReceiverA {
@RabbitHandler
public void process(Map testMessage) {
log.info("FanoutReceiverA消费者收到消息:{}", JSONUtil.toJsonPrettyStr(testMessage));
}
}
1
2
3
4
5
6
7
8
9
@Component
@RabbitListener(queues = "fanoutB")
@Slf4j
public class FanoutReceiverB {
@RabbitHandler
public void process(Map testMessage) {
log.info("FanoutReceiverB消费者收到消息:{}", JSONUtil.toJsonPrettyStr(testMessage));
}
}
1
2
3
4
5
6
7
8
9
@Component
@RabbitListener(queues = "fanoutC")
@Slf4j
public class FanoutReceiverC {
@RabbitHandler
public void process(Map testMessage) {
log.info("FanoutReceiverC消费者收到消息:{}", JSONUtil.toJsonPrettyStr(testMessage));
}
}

重新启动服务,可以看到控制台打印输出,说明该条消息已经被3个消费者消费成功了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2020-09-13 21:47:56,509 [INFO] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] [net.zhaoxiaobin.rabbitmq.consumer.fanout.FanoutReceiverA:25] [] FanoutReceiverA消费者收到消息:{
"createTime": "2020-09-13 21:38:57",
"messageId": "765c1218-6c4c-4a72-af2c-6f386ff8e0d2",
"messageData": "testFanoutMessage"
}
2020-09-13 21:47:56,678 [INFO] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#2-1] [net.zhaoxiaobin.rabbitmq.consumer.fanout.FanoutReceiverB:25] [] FanoutReceiverB消费者收到消息:{
"createTime": "2020-09-13 21:38:57",
"messageId": "765c1218-6c4c-4a72-af2c-6f386ff8e0d2",
"messageData": "testFanoutMessage"
}
2020-09-13 21:47:56,885 [INFO] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#3-1] [net.zhaoxiaobin.rabbitmq.consumer.fanout.FanoutReceiverC:25] [] FanoutReceiverC消费者收到消息:{
"createTime": "2020-09-13 21:38:57",
"messageId": "765c1218-6c4c-4a72-af2c-6f386ff8e0d2",
"messageData": "testFanoutMessage"
}

参考链接

代码地址