本文对RabbiteMQ的学习做一个总结,希望能加深巩固我对MQ的运行机制和MQ的一些高级特性,以及消息队列的应用场景和对应的解决方案,为微服务应用开发打下坚实的基础!

分为两篇文章,本篇为基础篇,介绍了MQ的机制和基本应用。进阶篇介绍了MQ的一些高级特性以及如何保证消息的可靠性和延迟消息的实现

1.常见MQ对比

2.安装RabbitMQ

Docker一条命令安装

docker run \
 -e RABBITMQ_DEFAULT_USER=itheima \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq \
 -p 15672:15672 \     # 15672是web控制台端口
 -p 5672:5672 \       # 5672是MQ的收发消息端口
 --network hmall \    # 这里配置网络名,可以使用命令先创建好网络 docker network creat <网络>
 -d \
 rabbitmq:3.8-management

3.RabbitMQ架构图

4.交换机类型

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!以下是四种交换机类型:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。

  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列

  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符

  • Headers:头匹配,基于MQ的消息头匹配,用的较少。

4.1 Direct交换机

Direct与Fanout交换机的差异

描述下Direct交换机与Fanout交换机的差异?

  • Fanout交换机将消息路由给每一个与之绑定的队列

  • Direct交换机根据RoutingKey判断路由给哪个队列

  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

4.2 Topic交换机

Direct交换机与Topic交换机的差异?

  • Topic交换机接收的消息RoutingKey必须是多个单词,以 **.** 分割

  • Topic交换机与队列绑定时的bindingKey可以指定通配符

  • #:代表0个或多个词

  • ‘*’:代表1个词

5.RabbitMQ的使用

(1)引入SpringAMQP

<!--消息发送-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

(2)配置MQ地址

spring:
  rabbitmq:
    host: 192.168.150.101 # 你的主机IP
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: hmall # 用户名
    password: 123 # 密码
  • 创建队列 : new Queue类

  • 创建交换机 : new Exchange类(接口) –> 子类有TopicExchange、FanoutExchange、DirectExchange等

  • 绑定交换机和队列 : 使用BindingBuilder来创建Binding对象来绑定交换机和队列

5.1 发布者/生产者模块

(1)发布者和订阅者都要引入依赖

<!--消息发送-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

(2)调用API发送消息

// 首先注入RabbitTemplate对象
@Autowired
private RabbitTemplate rabbitTemplate;

// 然后在方法中就可以调用API进行消息发送了
rabbitTemplate.convertAndSend(exchangeName, "RoutingKey", message);

5.2 订阅者/消费者模块

5.2.1基于@Bean方式声明交换机和队列

@Configuration
public class FanoutConfig {
 /**
     * 声明交换机
     * @return Fanout类型交换机,也可以是其他类型的交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("hmall.fanout");
    }
    /**
     * 第1个队列
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
    /**
     * 第2个队列
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
  }

5.2.2基于注解方式声明交换机和队列

使用Direct类型交换机和队列

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),  // 队列声明
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT), // 交换机声明
    key = {"red", "blue"} // RoutingKey声明
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

使用Topic类型交换机和队列

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
    key = "china.#"   // Topic交换机使用通配符指定Key
))
public void listenTopicQueue1(String msg){
    System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
    key = "#.news"
))
public void listenTopicQueue2(String msg){
    System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

6. 消息转换器(配置json消息转换器)

在MQ数据传输时,发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。

Spring默认采用的是JDK序列化方式,这种方式存在以下问题:

  • 数据体积过大

  • 有安全漏洞

  • 可读性差

(1)引入依赖

使用JSON方式来做序列化和反序列化,在发布者和订阅者两个模块都引入依赖

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

注意:如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。

(2)配置消息转换器

在发布者和订阅者两个模块的启动类都添加以下@Bean

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}

其中的消息转换器中添加的messageId可以便于我们将来做幂等性判断

注:发布者发送的什么数据类型的消息,订阅者就使用什么样的数据类型接收