RabbitMQ
安装
docker run \
-e RABBITMQ_DEFAULT_USER=用户名 \
-e RABBITMQ_DEFAULT_PASS=密码 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network 网络名\
-d \
rabbitmq:3.8-management
15672:控制台端口
5672:收发消息的端口
概念:
- publisher:生产者,也就是发送消息的一方
- consumer:消费者,也就是消费消息的一方
- queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
- exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
- virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

JAVA客户端-SpringAMQP
-
引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency> -
配置
application.yaml文件spring:
rabbitmq:
host: 192.168.150.101 # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: hmall # 用户名
password: 123 # 密码 -
然后在类中注入容器
发送消息:
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}接收消息:
@Component
public class SpringRabbitListener {
// 利用RabbitListener来声明要监听的队列信息
// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
// 可以看到方法体中接收的就是消息体的内容
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
}
Work Queue
任务模型,让多个消费者绑定到一个队列,共同消费队列中的消息。同一条消息只能被一个消费者处理。
但是默认情况下,每个消费者分配到的消息数都是平均的,不会因消费者的处理速度差异而改变,这会导致效率降低
在spring中有一个简单的配置,可以解决这个问题:
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
交换机
接受发送者发送的消息,并将消息路由到与其绑定的队列。
Fanout交换机 - 广播
会将接收到的消息路由到每一个跟其绑定的队列,也叫广播模式
- 1) 可以有多个队列
- 2) 每个队列都要绑定到
Exchange(交换机) - 3) 生产者发送的消息,只能发送到交换机
- 4) 交换机把消息发送给绑定过的所有队列
- 5) 订阅队列的消费者都能拿到消息
发送消息:
@Test
public void testFanoutExchange() {
// 交换机名称
String exchangeName = "hmall.fanout";
// 消息
String message = "hello, everyone!";
// 三个值,交换机名,交换机密码,信息
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
接收消息:
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
Direct交换机 - 定向
会将接收到的消息根据规则路由到指定的队列,称为定向路由
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey(路由key) - 消息的发送方在 向
Exchange发送消息时,也必须指定消息的RoutingKey。 Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的Routing key完全一致,才会接收到消息
发送消息
@Test
public void testSendDirectExchange() {
// 交换机名称
String exchangeName = "hmall.direct";
// 消息
String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
接收消息
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {
System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {
System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
Topic交换机 - 通配符订阅
基于RoutingKey做消息路由,但是routingKey通常是多个单词的组合,并 且以.分割
队列与交换机指定BindingKey时可以使用通配符:
#:代指0个或多个单词*:代指一个单词
在交换机绑定队列时,指定对应的BindingKey
例子:
topic.queue1:绑定的是china.#,凡是以china.开头的routing key都会被匹配到,包括:china.newschina.weather
发送消息
/**
* topicExchange
*/
@Test
public void testSendTopicExchange() {
// 交换机名称
String exchangeName = "hmall.topic";
// 消息
String message = "喜报!孙悟空大战哥斯拉,胜!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
接收消息
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg){
System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg){
System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}
声明队列和交换机
SpringAMQP提供了类来声明队列、交换机及其绑定关系:
Queue:用于声明队列,可以用工厂类QueueBuilder构建Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建
基于Bean声明
两种创建方式:
示例,声明Fanout交换机
@Configuration
public class FanoutConfiguration {
@Bean
public FanoutExchange fanoutExchange() {
// return new FanoutExchange("hmall.fanout");
return ExchangeBuilder.fanoutExchange("hmall.fanout").build();
}
@Bean
public Queue fanoutQueue1() {
// return new Queue("fanout.queue1");
return QueueBuilder.durable("fanout.queue1").build();
}
@Bean
public Binding fanoutQueue1Binding(Queue fanoutQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
}
}
基于注解声明
@Component
public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1", durable = "true"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red, blue"}
))
public void listenDirectQueue1(String msg) {
System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}
}
消息转换器
默认情况下Spring采用的序列化方式是JDK序列化。JDK序列化存在下列问题:
- 数据体积过大
- 有安全漏洞
- 可读性差
配置JSON转换器
-
在接收和发送消息的服务中加入依赖:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
注意:如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。
-
配置消息转换器,在接收和发送消息的服务的启动类中加入
Bean:@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
消费者接收Object
publisher是用Map发送,那么消费者也一定要用Map接收,格式如下:
@RabbitListener(queues = "object.queue")
public void listenSimpleQueueMessage(Map<String, Object> msg) throws InterruptedException {
System.out.println("消费者接收到object.queue消息:【" + msg + "】");
}