0. 前言

在了解基本的MQ发布订阅机制后,我脑中就有一个疑惑,发布者完成自己的业务然后通过发送消息到MQ,但如果消息到达MQ后此时MQ却宕机了导致消息丢失了,或者MQ向订阅者发送消息时,发送失败了,那么这些情况都会导致下游的消费者和上游的发布者的业务处理数据不一致的情况,在很多业务上都会出现bug,甚至造成不可挽回的损失,这怎么处理呢?

那么我们应该要有相应的措施,来解决MQ在消息发送过程中可能会出现的消息丢失的情况,保证消息的可靠性,进而保障业务的安全运行!

在MQ工作流程中,以下原因会导致消息丢失:

  • 发送消息时丢失:

    • 生产者发送消息时连接MQ失败

    • 生产者发送消息到达MQ后未找到Exchange

    • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue

    • 消息到达MQ后,处理消息的进程发生异常

  • MQ导致消息丢失:

    • 消息到达MQ,保存到队列后,尚未消费就突然宕机

  • 消费者处理消息时:

    • 消息接收后尚未处理突然宕机

    • 消息接收后处理过程中抛出异常

那应该如何确保MQ消息的可靠性?可以从三方面保障:保障发送者的可靠性、保障MQ的可靠性、保障消费者的可靠性

1.发送者的可靠性

1.1 生产者重试机制

当生产者发送消息时,出现网络故障导致MQ没有收到消息,可以使用SpringAMQP提供的消息发送时的重试机制

即:当RabbitTemplate与MQ连接超时后,多次重试。

只需要添加以下配置即可:

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
        max-attempts: 3 # 最大重试次数
        max-attempts: 3 # 最大重试次数

注意:这种重试机制是阻塞式的重试,也就是说重试过程时,当前线程是阻塞的,若对业务性能有要求,建议禁用重试功能

1.2 生产者确认机制

当发送者发送消息到MQ后仍丢失,可以针对这种情况开启生产者消息确认机制

当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执 (ACK、NACK、return)

  • 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功

  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功

  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功

  • 其它情况都会返回NACK,告知投递失败

其中acknack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。
默认两种机制都是关闭状态,需要通过配置文件来开启。

    **实现过程略**(其实也就添加配置即可)

注意:

开启生产者确认机制比较消耗MQ性能,一般不建议开启。并且触发确认只有以下几种情况:

  • 路由失败:一般是因为RoutingKey错误导致,往往是编程导致

  • 交换机名称错误:同样是编程错误导致

  • MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了。

    ** 综上,一般场景下并不需要开启这种机制!**

2. MQ的可靠性

2.1 数据持久化

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:

  • 交换机持久化

    • 在控制台的Exchanges页面,添加交换机时可以配置交换机的Durability参数为Durable即可

    • 设置为Durable就是持久化模式,Transient就是临时模式

  • 队列持久化

    • 在控制台的Queues页面,添加队列时,同样可以配置队列的Durability参数

    • 同样,设置为Durable就是持久化模式,Transient就是临时模式。

    • 队列这里还有其他的参数,可以实现更多的功能

  • 消息持久化

    • 在控制台发送消息的时候,可以添加很多参数,而消息的持久化是要配置一个properties

    • properties中添加如下参数:delivery_mode=2 即可实现消息持久化

说明:在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。
不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。

2.2 LazyQueue惰性队列

在某些情况下,MQ会产生消息积压,导致内存占用达到预警上限,为了解决这个问题,从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列

惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存

  • 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)

  • 支持数百万条的消息存储

而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。因此官方推荐升级MQ为3.12版本或者所有队列都设置为LazyQueue模式。

配置LazyQueue的三种方式

(1)控制台添加参数方式

只需在添加队列的时候,添加 x-queue-mod=lazy 参数即可设置队列为Lazy模式:

(2)代码配置Lazy模式

@Bean
public Queue lazyQueue(){
    return QueueBuilder
            .durable("lazy.queue")
            .lazy() // 开启Lazy模式 ,底层源码仍为第一种添加参数的形式
            .build();
}加参数的形式
            .build();
}

(3)基于注解来声明队列并配置Lazy模式

@RabbitListener(queuesToDeclare = @Queue(
        name = "lazy.queue",
        durable = "true",
        arguments = @Argument(name = "x-queue-mode", value = "lazy")  // 其实也是使用参数形式
))
public void listenLazyQueue(String msg){
    log.info("接收到 lazy.queue的消息:{}", msg);
}

3. 消费者的可靠性

当RabbitMQ向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多,比如:

  • 消息投递的过程中出现了网络故障

  • 消费者接收到消息后突然宕机

  • 消费者接收到消息后,因处理不当导致异常

一旦发生上述情况,消息也会丢失。因此,RabbitMQ必须知道消费者的处理状态,一旦消息处理失败才能重新投递消息

3.1 消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息

  • nack:消息处理失败,RabbitMQ需要再次投递消息

  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

那么消息确认回执如何实现呢?

由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认回执。并允许我们通过配置文件设置ACK处理方式,有三种模式:

  • **none**:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用

  • **manual**:手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活

  • auto:自动模式(推荐)。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:

    • 如果是业务异常,会自动返回nack

    • 如果是消息处理或校验异常,自动返回reject;

配置消费者确认机制

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 自动ack
# 当我们把配置改为 auto 时,消息处理失败后,会回到RabbitMQ,并重新投递到消费者。息处理失败后,会回到RabbitMQ,并重新投递到消费者。

3.2 失败重试机制

如果消息发送到消费者后,总是执行出错,消息重新投递,但一直出错,MQ就会陷入循环导致系统压力增大。所以又提出了:

消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

配置:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试

  • 重试达到最大次数后,Spring会返回reject,消息会被丢弃

3.3 失败处理策略

如果业务对消息可靠性要求很高的话,那么上面的失败重试机制中消息被丢弃也是不能容忍的,那么就要有一个失败处理的策略

这个策略是由MessageRecovery接口来定义的,它有3个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(推荐)

推荐最后一种方式,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

具体实现即在消费者模块正常创建一个交换机和队列并绑定,且加上消息转换器。

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

这样,就可以定义一些方法来监听这个error队列,当出现重试也无法解决的错误时,就会调用这些方法来完成错误处理的业务

@RabbitListener(queues = "error.queue")
public void listenErrorQueue(String msg) {
    // 这里完成error错误处理
    // 例如:给用户一个友好的界面、向开发人员发送邮件短信等通知错误、
    // ...
}

3.4 业务幂等性的保障

业务幂等性即指同一个业务,执行一次或多次对业务状态的影响是一致的

数据的删除、查找往往是自带幂等的,而数据的更新往往都是非幂等的,如果重复执行可能造成不一样的后果

保证消息处理的幂等性。这里给出两种实现方案:

  • 唯一消息ID

  • 业务状态判断

3.4.1 唯一消息ID

思路如下:

(1)每一条消息都生成一个唯一的id,与消息一起投递给消费者。

(2)消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库

(3)如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

3.4.2 消息幂等性的其他解决方案

其他解决方案,请参考我另一篇文章,专门给出了更多的保障业务幂等性的解决方案。

3.5 兜底方案

虽然我们利用各种机制尽可能增加了消息的可靠性,但也不好说能保证消息100%的可靠。万一万一真的MQ通知失败该怎么办呢?

兜底解决方案:

消费者利用定时任务定期主动查询,并利用查询得到的结果完成后面的业务

至此,消息可靠性的问题就得到了完美解决!

4. 延迟消息

在RabbitMQ中实现延迟消息也有两种方案:

  • 死信交换机+TTL

  • 延迟消息插件

4.1 死信交换机和延迟消息

4.1.1 死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为false

  • 消息是一个过期消息,超时无人消费

  • 要投递的队列消息满了,无法投递

如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。

死信交换机有什么作用呢?

  1. 收集那些因处理失败而被拒绝的消息

  2. 收集那些因队列满了而被拒绝的消息

  3. 收集因TTL(有效期)到期的消息

4.1.2 延迟消息

发送的消息携带过期时间,当投递到ttl.queue时,由于没有消费者所以暂存,当到达过期时间就成为了死信

死信被再次投递到死信交换机hmall.direct,并沿用之前的RoutingKey,也就是blue,此时消息就被投递到了direct目标队列

这时有消费者监听此队列的话,消息就得到了消费,即实现了延迟消息

** 注意:**
RabbitMQ的消息过期是基于追溯方式来实现的,也就是说当一个消息的TTL到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。
当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此你设置的TTL时间不一定准确

4.2 DelayExchange插件

基于死信队列虽然可以实现延迟消息,但是太麻烦了。因此RabbitMQ社区提供了一个延迟消息插件来实现相同的效果。
官方文档说明:
Scheduling Messages with RabbitMQ | RabbitMQ - Blog

4.2.1 下载

插件下载地址:
GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ
由于我们安装的MQ是3.8版本,因此这里下载3.8.17版本:

4.2.2 安装

一开始在使用Docker启动RabbitMQ时,我们使用的命令是:

docker run \
 -e RABBITMQ_DEFAULT_USER=itheima \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 -v mq-plugins:/plugins \   # 这里挂载了插件目录
 --name mq \
 --hostname mq \
 -p 15672:15672 \     # 15672是web控制台端口
 -p 5672:5672 \       # 5672是MQ的收发消息端口
 --network hmall \    # 这里配置网络名,可以使用命令先创建好网络 docker network creat <网络>
 -d \
 rabbitmq:3.8-management

我们在启动MQ时对插件目录进行了挂载,可以使用以下命令查看RabbitMQ的插件目录对应的数据卷。

docker volume inspect mq-pluginsdocker volume inspect mq-plugins

在结果中找到”Mountpoint“: “/var/lib/docker/volumes/mq-plugins/_data”

通过Mountpoint属性拿到后面的插件挂载目录,可能与我的这里不一样,使用自己的插件路径,我们上传准备好的插件到该目录下。

然后属于以下命令安装插件

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchangerabbitmq_delayed_message_exchange

4.2.3 代码声明延迟交换机

基于注解方式:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayMessage(String msg){
    log.info("接收到delay.queue的延迟消息:{}", msg);
}
public void listenDelayMessage(String msg){
    log.info("接收到delay.queue的延迟消息:{}", msg);
}

基于@Bean的方式:

@Slf4j
@Configuration
public class DelayExchangeConfig {

    @Bean
    public DirectExchange delayExchange(){
        return ExchangeBuilder
                .directExchange("delay.direct") // 指定交换机类型和名称
                .delayed() // 设置delay的属性为true
                .durable(true) // 持久化
                .build();
    }

    @Bean
    public Queue delayedQueue(){
        return new Queue("delay.queue");
    }
    
    @Bean
    public Binding delayQueueBinding(){
        return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
    }
}

4.2.4 代码发送延迟消息

发送消息时,必须通过x-delay属性设定延迟时间:

@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void testPublisherDelayMessage() {
    // 1.创建消息
    String message = "hello, delayed message";
    // 2.发送消息,利用消息后置处理器添加消息头
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 添加延迟消息属性
            message.getMessageProperties().setDelay(5000);
            return message;
        }
    });
}
        }
    });
}

注意:
延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。
因此,不建议设置延迟时间过长的延迟消息