MQ的概念


为什么使用MQ

MQ即消息队列。它的作用大概可以概括为:解耦、异步、削峰。

  • 解耦:例如A系统产生的数据,BCD系统都需要使用,则可以把数据发送到MQ中,让BCD自行去消费。系统之间用MQ解耦。
  • 异步:例如一个rest api需要操作ABC三个系统进行写操作,耗时较长。如果不要求精确实时读取,则可以把写操作放入MQ中异步去进行,减少耗时,提高响应速度。
  • 削峰:在高并发的场景下,大量请求同时到达服务器,如果同时操作数据库的话,数据库可能无法抗住压力,这时可以把请求放入MQ,慢慢去消费,减小数据库的压力。

MQ的引入也会带来一些问题。比如引入中间件后,系统复杂度上升,可用性降低,并且会带来一致性的问题。

MQ的选择

常见的MQ有RabbitMQ、RocketMQ、Kafka。这里我选择RocketMQ为例来介绍,后续的内容,也基于RocketMQ。

为什么选择RocketMQ。其单机吞吐量高,并可以支持大量topic,延迟低,并且是分布式架构。另外,它还支持事务消息、顺序消息等。



MQ的可靠性


MQ的消息丢失可能发生在生产者、MQ、消费者三个地方。

生产者丢失消息

生产者丢失消息,可能是发送消息时就发送失败,或者发送成功了,但是MQ没有收到消息。

对于发送失败,可以开启重发。对于发送成功,但MQ没有收到,可以使用同步消息。而如果对响应时间有要求,需使用异步消息时,可以通过异步消息+回调+本地消息表的方式解决。即首先在本地消息表插入数据,状态为发送中,然后异步发送消息,如果MQ成功接收消息,则回调并更新本地消息表状态为发送成功。同时开启定时任务,轮询本地消息表中未发送成功的消息,并重新发送,当重试次数超过一定次数,通知开发人员人工处理。

另外,也可以使用事务消息。事务消息可以保证本地事务+MQ发送一定成功。事务消息具体原理请看下文事务消息的部分。

MQ丢失消息

MQ本身丢失消息,一般是当MQ收到消息后,消息在内存中,这时如果机器宕机,则消息丢失。

对于这样情况,可以开启消息的持久化机制。持久化分同步和异步两种。同步方式能保证消息不丢失,但是对于性能会有影响,异步方式则不能保证消息完全不丢失。这样需要根据业务自行平衡。

消费者丢失消息

消费者丢失消息,一般是消费者收到消息,还未消费消息,这时服务器宕机,消息丢失。

对于这种情况,可以在消费者中注册监听器,当消费者获取到消息,就回调监听器,并执行消费,当消费成功时,才返回消费成功的状态给MQ。如果消费时出现异常,超时,或者取到消费后宕机,MQ会间隔性重发,知道消息返回成功。当重发次数超过一定次数时,消息会放入死信队列,由人工来处理。



MQ的事务消息


RocketMQ拥有事务消息,其能保证本地事务和消息的发送能一起执行。

其原理是:

  • 首先生产者发送一条prepare消息,该消息对消费者不可见。
  • broker预提交消息,把消息存在commitlog中,并对消费者不可见。
  • broker预提交成功后,回调通知生产者,生产者执行本地事务。如果本地事务成功,则向broker发送commit指令。如果本地事务失败,则发送rollback指令。
  • 当broker再次接收到commit指令,会提交这个消息到consumeQueue,使消息对消费者可见,则事务消息发送成功。而如果收到rollback指令,则回滚掉预提交的消息,消息取消发送。

在此过程中可能会出现以下情况:

  • 预消息发送失败,则生产者就会收到异常反馈,可以进行重发。
  • 预消息发送成功,本地事务失败,则事务会回滚,消息也会被取消。
  • 预消息发送成功,本地事务成功,但再次确认的commit指令发送失败。那么MQ会进行状态回查,即MQ定时去查询commitlog中的预消息,对于长时间状态未修改的消息,发起对生产方本地事务执行情况的回查,根据回查的结果,修改事务状态,并向broker再次发送指令。(需查看RocketMQ版本,4.2.0之前的版本,未支持本地事务状态回查,需要自行增加一个本地消息表来保证本地事务的状态)



MQ的顺序消息


在一些场景中,我们希望发送和消费的消息具体顺序性。比如一系列操作分为多条消息,每条消息间具体顺序。这里就可以使用RocketMQ的顺序消息。

MQ的顺序有全局顺序和分区顺序。因为分布式MQ架构中,消息会被发到不同的分区(queue)。一般来说大部分场景都是保证分区顺序即可,而如果需要全局顺序,可以让消息只发送到一个本区。

MQ顺序消息的原理是:

  • 生产者发送要有序。即生产者在发送消息时,通过MessageQueueSelector和同一系列消息的标识,把同一系列的消息都有序地发送到同一个分区。
  • 消费者消费时要有序。即消费者在拉取消息消费时,因为发送和存储是有序的,则拉取也是须有的。获取消息时单线程执行,保证有序性。



重复消息问题


重复消息的原因

首先要为什么会出现重复消息。在消费者消费的时候,如果消费者消费完成,回调通知broker时。由于网络等原因,造成通知丢失,从而造成MQ重复发送消息。

对于重复消息,需要看对应的业务进行重复操作时,是否会对结果产生影响。如果是查询,或者是根据唯一索引插入数据时,重复操作并不会产生影响。而如果是扣减某个字段的值之类的操作,则重复操作会影响其结果。这样就需要对该操作做幂等性处理。

RocketMQ本身不处理重复消息,而是交由业务端处理。而重复发送消息最终都是导致重复消费。所以,我们需要对消费做幂等处理。

重复消息的处理

可以建立一个本地消息记录表,消费者每次消费前,查询记录表中消息的状态,如果是未消费成功,则消费。并在消费者消费成功后,更新记录表中的状态。之后如果有重复的消息,查询到记录表中的状态已为消费成功,则不再消费。

既然只是需要在短时间内记录消息的状态,那么可以考虑用缓存代替本地消息记录表。每次消费前在Redis中查询是否有消费记录,如果没有则执行消费。如果已有消费记录,则不再执行消费。



MQ的可用性


RocketMQ支持集群部署,保证其可用性。

NameServer

NameServer无状态,节点间数字一致。所以多节点NameServer可在一部分节点不可用时,保证整体服务正常。且生产者在第一次发送消息时会从NameServer获取到Broker地址后缓存到本地,如果NameServer整个集群不可用,短时间内对于生产者和消费者并不会产生太大影响。

Broker

Broker有几种部署方式。单master、多master、多master多slave。master和slave间有同步复制和异步复制。当有master宕机时,其对应的salve仍能提供服务。如果是多组master-salve,则其他组也能提供服务。

master-slave同步:

  • broker的master收到消息后,消息标记为uncommitted状态。
  • master发送消息给所有slave节点,salve节点收到消息后,发送ack确认给master。
  • 当master收到超过半数的salve的ack确认,则把消息标记为committed。
  • 最后发送committed状态给salve,salve修改消息状态为committed。

Producer

Producer生产者在发送消息时,把topic的多个message queue创建到多个broker组(broker同名不同id为同组,id为0为master,id>0为slave)。当一组broker不可用时,可以发送给其他组。

Consumer

Consumer消费者在消费时,如果master不可以,则自动切换到salve拉取。



RocketMQ的基本原理


实现原理

RocketMQ由NameServer注册中心,Broker,Producer生产者,Consumer消费者组成。运行过程:

  • Broker启动时,向NameServer注册,并每30s发送一次心跳,保存连接。
  • Producer发送消息时,从NameServer获取Broker地址,根据负载均衡算法发送消息至其中一个Broker。
  • Consumer从Broker拉取消息消费。

Broker数据的存储

Broker消息存储由commitlog、consumequeue、indexfile完成。

  • commitlog在broker每次收到消息后,都会把消息保存在commitlog文件上。
  • consumequeue用来保存消息在commitlog上的偏移量offset,即消息在commitlog上的物理位置。
  • indexfile用来保存messageKey和commitlog中的偏移量offset,可以用于根据messageKey查询消息。

消息写入时,顺序写入commitlog,读取时通过consumerqueue快速定位消息在commitlog中的位置。保证了RocketMQ的读写速度。



消息积压问题


当消费者出错或者因为其他问题,导致消费不能正常进行,一段时间后可能会发送消息积压。

处理方法:

  • 写一个临时的consumer消费程序,让他取消费积压的消息,消费时不做处理,而是转发到一个新的topic和MQ中。新的MQ和topic重新单独申请部署,数量要扩大到能承受当前积压的消息。
  • 修复出错的consumer。修复后,重新拉取新MQ中的消息来消费。



分布式事务


MQ还可用于在微服务中做分布式事务。例如在系统A中完成本地事务后,发送消息到系统B中完成操作。使用RocketMQ的事务消息,可以保证本地事务和发送消息一起执行。而MQ本身的机制则保证了消息能被消费。

使用MQ做分布式事务,达到的是最终一致性。当消费者消费发送异常时,则MQ会不断发起重试,直到重试超过一定次数,则发送告警,由人工处理。