个人博客


1、生产者、交换机、队列、消费者关系

2、如何保证RabbitMQ的高可用

RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。

  • 单机模式Demo 级别,一般生产不会使用。
  • 普通集群模式:普通集群模式,意思就是在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。你创建的 queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。


这种方式确实很麻烦,也不怎么好,没做到所谓的分布式,就是个普通集群。因为这导致你要么消费者每次随机连接一个实例然后拉取数据,要么固定连接那个 queue 所在实例消费数据,前者有数据拉取的开销,后者导致单实例性能瓶颈。

而且如果那个放 queue 的实例宕机了,会导致接下来其他实例就无法从那个实例拉取,如果你开启了消息持久化,让 RabbitMQ 落地存储消息的话,消息不一定会丢,得等这个实例恢复了,然后才可以继续从这个 queue 拉取数据。

所以这个事儿就比较尴尬了,这就没有什么所谓的高可用性,这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。

  • 镜像集群模式:这种模式,才是所谓的 RabbitMQ 的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。

    那么如何开启这个镜像集群模式呢?其实很简单,RabbitMQ 有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。

这样的话,好处在于,你任何一个机器宕机了,没事儿,其它机器(节点)还包含了这个 queue 的完整数据,别的 consumer 都可以到其它节点上去消费数据。坏处在于,第一,这个性能开销也太大了吧,消息需要同步到所有机器上,导致网络带宽压力和消耗很重!第二,这么玩儿,不是分布式的,就没有扩展性可言了,如果某个 queue 负载很重,你加机器,新增的机器也包含了这个 queue 的所有数据,并没有办法线性扩展你的 queue

3、避免消息堆积

主要有以下场景会容易导致消息堆积:

  • 生产者和消费者没有同时在线的情况,只有生产没有消费。
  • 消费端限流,通过channel.basicQos(int prefetchCount)函数限制消费者的处理速率,从而导致队列中的消息堆积直到队列塞满为止。
  • 在手动 ACK 的情况下,如果消费端拒绝消息并且重回队列,且在一些极端时候,消费端持续拒绝消息就会发生消息堆积的问题。
  • 对单条消息设置 TTL ,如果先入队列的过期时间设置比较长,后面的消息过期时间设置比较短,则队列中会有很多死消息不能被及时地淘汰,从而导致消息的堆积。

消息堆积容易造成队列满后的消息丢失,而且场景3可能还会出现重复消费的情况,不能保证消费消息幂等性

4、避免消息丢失

RabbitMQ丢失消息主要分3种情况:

  1. 生产者将消息发送给RabbitMQ时丢失。
  2. RabbitMQ收到消息后意外丢失。
  3. 消费者处理异常导致丢失。


如何避免生产者丢失数据

  • 开启事务机制,但是RabbitMQ的事务机制是同步的,所以开启后吞吐量会下降,对性能会有很大影响一般不建议使用。
  • 开启消息发送确认机制(confirm 模式),在生产者那里设置开启 confirm 模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。

事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。

如何避免RabbitMQ丢失数据
开启持久化机制,主要有2个步骤:

  • 创建 queue 的时候将其设置为持久化,这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue 里的数据的。
  • 发送消息的时候将消息的 deliveryMode 设置为 2,就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。

必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。

所以,持久化可以跟生产者那边的 confirm 机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者 ack 了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到 ack ,也是可以自己重发的。

如何避免消费者丢失数据
消费端丢失消息一般是在自动 ack 的情况下,拿到消息在处理过程中发生异常导致数据丢失。所以在对消息传递可靠性要求比较高的情况下需要切换到手动 ack 的模式,只有最后消费成功了,再给RabbitMQ确认。

5、如何保证不重复消费

开启自动 ack 模式,消息只投递给消费者一次,只要投递完成不管消费者有没有消费成功都确认消息。

这样做的弊端也很明显,如果消费失败就会导致消息丢失。但如果开启手动 ack 模式,可能消费消息明明已经成功,但在最后因为一些不可控的因素导致不可预料的异常发生,最终没有 ack 成功从而让消息再次回到队列又给其它消费者去重复消费。这个问题本质上是因为消费消息和手动 ack 确认这是2个独立的操作,很难保证这2个操作的原子性(同时成功)。

所以一般来讲,光靠MQ自身的机制很难同时保证消息不丢失和不重复消费消息。具体怎么应用要看实际业务场景;比如是比较重要的数据,我们可以优先保证消息不丢失,允许少量的重复消费,只要对消费者的下游做好幂等即可。

6、对比Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点?

特性ActiveMQRabbitMQRocketMQKafka
单机吞吐量万级,比 RocketMQ、Kafka 低一个数量级同 ActiveMQ10 万级,支撑高吞吐10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景
topic 数量对吞吐量的影响topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topictopic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源
时效性ms 级微秒级,这是 RabbitMQ 的一大特点,延迟最低ms 级延迟在 ms 级以内
可用性高,基于主从架构实现高可用同 ActiveMQ非常高,分布式架构非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性有较低的概率丢失数据基本不丢经过参数优化配置,可以做到 0 丢失同 RocketMQ
功能支持MQ 领域的功能极其完备基于 erlang 开发,并发能力很强,性能极好,延时很低MQ 功能较为完善,还是分布式的,扩展性好功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用

综上,各种对比之后,有如下建议:

一般的业务系统要引入 MQ,最早大家都用 ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以不推荐用这个;

后来大家开始用 RabbitMQ,但是确实 erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,活跃度也高;

不过现在确实越来越多的公司会去用 RocketMQ,确实很不错,毕竟是阿里出品,但社区可能有突然黄掉的风险(目前 RocketMQ 已捐给 Apache,但 GitHub 上的活跃度其实不算高)对自己公司技术实力有绝对自信的,推荐用 RocketMQ,否则回去老老实实用 RabbitMQ 吧,人家有活跃的开源社区,绝对不会黄。

所以中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。

如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。