Skip to content

MQ消息队列

初识MQ

同步调用

同步调用:微服务间基于Feign的调用

同步调用的优点:

  • 时效性较强,可以立即得到结果

同步调用的问题:

  1. 耦合度高
  2. 性能和吞吐能力下降
  3. 有额外的资源消耗
  4. 有级联失败问题

异步调用

异步通信的优点:

  1. 耦合度低
  2. 吞吐量提升
  3. 故障隔离
  4. 流量削峰

异步通信的缺点:

  1. 依赖于Broker的可靠性、安全性、吞吐能力
  2. 架构复杂了,业务没有明显的流程线,不好追踪管理

什么是MQ

MessageQueue 消息队列:存放消息的队列。(图中的Broker)

image-20240228105607440

RabbitMQ

RabbitMQ概述

RabbitMQ是基于Erlang语言开发的开源消息通信中间件。

https://www.rabbitmq.com/

RabbitMQ的结构:

  1. channel:操作MQ的工具
  2. exchange:路由消息到队列中
  3. queue:缓存消息
  4. virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组

RabbitMQ部署

在线拉取下载镜像

sh
docker pull rabbitmq:3-management

安装MQ:执行下面的命令来运行MQ容器:

sh
docker run \
 -e RABBITMQ_DEFAULT_USER=admin \
 -e RABBITMQ_DEFAULT_PASS=admin \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3-management
  • 消息队列端口:5672
  • MQ后台端口:15672

SpringAMQP

SpringAMQP概述

https://spring.io/projects/spring-amqp

AMQP:Advanced Message Queuing Protocol,是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。

Spring AMQP:基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

简单队列

1、引入依赖

xml
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、配置mq连接

yaml
spring:
  rabbitmq:
    host: 20.27.55.226 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: admin # 用户名
    password: admin # 密码

3、消费者配置队列

java
@Bean
public Queue objectQueue(){
    return new Queue("simple.queue");
}

4、消费者监听

java
@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) {
        System.out.println("spring 消费者接收到消息 :" + msg);
    }
}

5、生产者发送到队列

convertAndSend不会自己创建队列,要先在控制台手动创建一个队列或者在消费者配置中声明一个队列,然后才能convertAndSend

java
@Test
void testSimpleQueue() {
    String queueName = "simple.queue";
    String msg = "wmh likes rabbit";
    rabbitTemplate.convertAndSend(queueName, msg);
}

工作队列

相当于简单队列的负载均衡版本。

消费者会预确认消息再处理,设置preFetch这个值,可以控制预取消息的上限

yaml
spring:
  rabbitmq:
    host: 20.27.55.226 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: admin # 用户名
    password: admin # 密码
    listener:
      simple:
        # 每次只能获取一条消息,处理完成才能获取下一个消息
        prefetch: 1

Fanout 广播

消费者:

java
@Component
public class SpringRabbitListener {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "fanout.q1"),
            exchange = @Exchange(name = "wmh.fanout", 
                                 type = ExchangeTypes.FANOUT)
    ))
    public void listenFanoutQueue1(String msg) {
        System.out.println(msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "fanout.q2"),
            exchange = @Exchange(name = "wmh.fanout", 
                                 type = ExchangeTypes.FANOUT)
    ))
    public void listenFanoutQueue2(String msg) {
        System.out.println(msg);
    }
}

生产者:

java
for (int i = 0; i < 10; i++) {
    String exchange = "wmh.fanout";
    String msg = "wmh likes fanout! - " + i;
    rabbitTemplate.convertAndSend(exchange, "", msg);
}

Direct 指定路由

会将接收到的消息根据规则路由到指定的Queue

消费者:

java
@Component
public class SpringRabbitListener {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.q1"),
            exchange = @Exchange(name = "wmh.direct", 
                                 type = ExchangeTypes.DIRECT),
            key = {"red", "yellow"}
    ))
    public void listenDirectQueue1(String msg) {
        System.out.println(msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.q2"),
            exchange = @Exchange(name = "wmh.direct", 
                                 type = ExchangeTypes.DIRECT),
            key = {"blue", "yellow"}
    ))
    public void listenDirectQueue2(String msg) {
        System.out.println(msg);
    }
}

生产者:

java
@Test
void testDirectQueue() {
    String exchange = "wmh.direct";
    String msg = "wmh likes direct!";
    rabbitTemplate.convertAndSend(exchange, "yellow", msg);
}

Topic 匹配路由

image-20240228181544957

生产者:

java
@Test
void testSimpleQueue() {
    String exchange = "wmh.topic";
    String msg = "wmh likes direct!";
    rabbitTemplate.convertAndSend(exchange, "china.news", msg);
}

消费者:

java
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.q1"),
    exchange = @Exchange(name = "wmh.topic", 
                         type = ExchangeTypes.TOPIC),
    key = "#.news"
))
public void listenTopicQueue1(String msg) {
    System.out.println(msg);
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.q2"),
    exchange = @Exchange(name = "wmh.topic", 
                         type = ExchangeTypes.TOPIC),
    key = "china.#"
))
public void listenTopicQueue2(String msg) {
    System.err.println(msg);
}

消息转换器

在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。

Spring的对消息对象的处理默认实现是基于JDK的ObjectOutputStream完成序列化。

修改序列化方式需要定义一个MessageConverter 类型的Bean

1、引入依赖

xml
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

2、生产者和消费者声明MessageConverter

java
@Bean
public MessageConverter jsonMessageConverter(){
    return new Jackson2JsonMessageConverter(); 
}

3、消费者配置队列

java
@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

    @Bean
    public Queue objectQueue(){
        return new Queue("object.queue");
    }

    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

4、消费者配置监听

java
@RabbitListener(queues = "object.queue")
public void listenJsonQueue(User user) {
    System.out.println(user.toString());
}

5、生产者发送消息

java
void testJsonQueue() {
    String queue = "object.queue";
    User wmh = new User(114514, "wmh");
    rabbitTemplate.convertAndSend(queue, wmh);
}

Released under the MIT License.