MQ学习笔记1(基础)
本文对RabbiteMQ的学习做一个总结,希望能加深巩固我对MQ的运行机制和MQ的一些高级特性,以及消息队列的应用场景和对应的解决方案,为微服务应用开发打下坚实的基础!
分为两篇文章,本篇为基础篇,介绍了MQ的机制和基本应用。进阶篇介绍了MQ的一些高级特性以及如何保证消息的可靠性和延迟消息的实现
1.常见MQ对比
2.安装RabbitMQ
Docker一条命令安装
docker run \
-e RABBITMQ_DEFAULT_USER=itheima \
-e RABBITMQ_DEFAULT_PASS=123321 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \ # 15672是web控制台端口
-p 5672:5672 \ # 5672是MQ的收发消息端口
--network hmall \ # 这里配置网络名,可以使用命令先创建好网络 docker network creat <网络>
-d \
rabbitmq:3.8-management
3.RabbitMQ架构图
4.交换机类型
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!以下是四种交换机类型:
Fanout:广播,将消息交给所有绑定到交换机的队列。
Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
Headers:头匹配,基于MQ的消息头匹配,用的较少。
4.1 Direct交换机
Direct与Fanout交换机的差异
描述下Direct交换机与Fanout交换机的差异?
Fanout交换机将消息路由给每一个与之绑定的队列
Direct交换机根据RoutingKey判断路由给哪个队列
如果多个队列具有相同的RoutingKey,则与Fanout功能类似
4.2 Topic交换机
Direct交换机与Topic交换机的差异?
Topic交换机接收的消息RoutingKey必须是多个单词,以
**.**
分割Topic交换机与队列绑定时的bindingKey可以指定通配符
#
:代表0个或多个词‘*’:代表1个词
5.RabbitMQ的使用
(1)引入SpringAMQP
<!--消息发送-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(2)配置MQ地址
spring:
rabbitmq:
host: 192.168.150.101 # 你的主机IP
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: hmall # 用户名
password: 123 # 密码
创建队列 : new Queue类
创建交换机 : new Exchange类(接口) –> 子类有TopicExchange、FanoutExchange、DirectExchange等
绑定交换机和队列 : 使用BindingBuilder来创建Binding对象来绑定交换机和队列
5.1 发布者/生产者模块
(1)发布者和订阅者都要引入依赖
<!--消息发送-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(2)调用API发送消息
// 首先注入RabbitTemplate对象
@Autowired
private RabbitTemplate rabbitTemplate;
// 然后在方法中就可以调用API进行消息发送了
rabbitTemplate.convertAndSend(exchangeName, "RoutingKey", message);
5.2 订阅者/消费者模块
5.2.1基于@Bean方式声明交换机和队列
@Configuration
public class FanoutConfig {
/**
* 声明交换机
* @return Fanout类型交换机,也可以是其他类型的交换机
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("hmall.fanout");
}
/**
* 第1个队列
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
/**
* 第2个队列
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
5.2.2基于注解方式声明交换机和队列
使用Direct类型交换机和队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"), // 队列声明
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT), // 交换机声明
key = {"red", "blue"} // RoutingKey声明
))
public void listenDirectQueue1(String msg){
System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}
使用Topic类型交换机和队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
key = "china.#" // Topic交换机使用通配符指定Key
))
public void listenTopicQueue1(String msg){
System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}
6. 消息转换器(配置json消息转换器)
在MQ数据传输时,发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
Spring默认采用的是JDK序列化方式,这种方式存在以下问题:
数据体积过大
有安全漏洞
可读性差
(1)引入依赖
使用JSON方式来做序列化和反序列化,在发布者和订阅者两个模块都引入依赖
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
注意:如果项目中引入了spring-boot-starter-web
依赖,则无需再次引入Jackson
依赖。
(2)配置消息转换器
在发布者和订阅者两个模块的启动类都添加以下@Bean
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
其中的消息转换器中添加的messageId可以便于我们将来做幂等性判断。
注:发布者发送的什么数据类型的消息,订阅者就使用什么样的数据类型接收