> 技术文档 > RabbitMQ之监听和@RabbitListener解析_simplerabbitlistenercontainerfactory

RabbitMQ之监听和@RabbitListener解析_simplerabbitlistenercontainerfactory


为什么生产者需要创建RabbitTemplate这么一个Bean才能发送消息,而监听却只需要加一个@RabbitListener注解即可

        首先,我们需要回顾在不使用SpringBoot时是怎样使用rabbitmq发送和接受消息的

        生产者实现:
public void testSendMessage() throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost(\"192.168.200.128\"); //发送和接收消息的端口号 factory.setPort(5672); //虚拟主机的地址 factory.setVirtualHost(\"/\"); factory.setUsername(\"root\"); factory.setPassword(\"123321\"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel Channel channel = connection.createChannel(); // 3.创建队列/* 声明队列 参数1:队列的名称 queueName 参数2:队列是否支持持久化 false:不持久化处理 参数3:队列是否排它:是否允许其它的connection下的channel连接 参数4:是否空闲时自动删除,当最后一个consumer(消费者)断开之后,队列将自动删除。 参数5:参数是rabbitmq的一个扩展,功能非常强大,基本是AMPQ中没有的。 */ String queueName = \"simple.queue\"; channel.queueDeclare(queueName, false, false, false, null); // 4.发送消息 String message = \"hello, rabbitmq!\"; /* 发送消息: 参数1:exchange 交换机 没有就设置为 \"\" 值就可以了 参数2:routingKey 路由的key 现在没有设置key,直接使用队列的名字queueName 参数3:发送数据到队列的时候,是否要带一些参数。直接赋值null即可 参数4:body 向队列中发送的消息数据 */ channel.basicPublish(\"\", queueName, null, message.getBytes()); System.out.println(\"发送消息成功:【\" + message + \"】\"); // 5.关闭通道和连接 channel.close(); connection.close(); }
        生产者实现:
public static void main(String[] args) throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost(\"192.168.200.128\"); factory.setPort(5672); factory.setVirtualHost(\"/\"); factory.setUsername(\"root\"); factory.setPassword(\"123321\"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel Channel channel = connection.createChannel(); // 3.创建队列 //TODO:如果MQ中有同名的队列就会使用该队列,没有就会创建队列 String queueName = \"simple.queue\"; channel.queueDeclare(queueName, false, false, false, null); // 4.订阅消息/* 参数1:消费者消费的队列名称 参数2:收到消息后自动应答,通知rabbitmq自动剔除已经被消费的消息 参数3:接口消息的回调:一旦队列下有新的消息,则自动回调DefaultConsumer对象下的handleDelivery方法 把消息以入参传入到该方法中 */ channel.basicConsume(queueName, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope,  AMQP.BasicProperties properties, byte[] body) throws IOException { // 5.处理消息 //TODO:body就是消费端接收到的消息 String message = new String(body); System.out.println(\"接收到消息:【\" + message + \"】\"); } }); System.out.println(\"等待接收消息。。。。\"); }

        在没有引入spingboot时rabbitmq是通过ConnectionFactory建立连接创建通道channel.basicPublish等方法来发送消息,监听消息也是先建立连接和通道然后通过channel.basicConsume回调函数来监听消息,在引入springboot后发消息通过yaml配置来创建RabbitTemplate的bean对象来发送消息,通过配置文件来创建交换机和队列,那监听是通过什么样的bean来实现的

        在引入 Springboot 后,监听 RabbitMQ 消息主要通过 `SimpleMessageListenerContainer` 或 `DirectMessageListenerContainer` 这两种类型的 Bean 实现        

        SimpleMessageListenerContainer

        工作原理:它是一个常用的消息监听容器,内部维护了一个消费者池,能够并发地处理多个消息。`SimpleMessageListenerContainer` 会从配置的队列中拉取消息,并将消息分发给对应的消息监听器进行处理。
        配置方式:可以通过 Java 配置或 XML 配置的方式创建 `SimpleMessageListenerContainer` Bean。

import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitMQConfig {    @Bean    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();        container.setConnectionFactory(connectionFactory);        container.setQueueNames(\"QueueName\");        // 设置消息监听器        container.setMessageListener((message) -> {            System.out.println(\"Received message: \" + new String(message.getBody()));        });        return container;    }}

        在上述代码中,创建了一个 `SimpleMessageListenerContainer` Bean,并配置了连接工厂、监听的队列名和消息监听器。      

        DirectMessageListenerContainer

        工作原理:它是另一种消息监听容器,与 `SimpleMessageListenerContainer` 不同的是,`DirectMessageListenerContainer` 直接使用 `Channel` 进行消息的消费,减少了中间层的开销,性能相对较高。
        配置方式:同样可以通过 Java 配置或 XML 配置的方式创建 `DirectMessageListenerContainer` Bean。

import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitMQConfig {    @Bean    public DirectMessageListenerContainer directMessageListenerContainer(ConnectionFactory connectionFactory) {        DirectMessageListenerContainer container = new DirectMessageListenerContainer(connectionFactory);        container.setQueueNames(\"QueueName\");        // 设置消息监听器        container.setMessageListener((message) -> {            System.out.println(\"Received message: \" + new String(message.getBody()));        });        return container;    }}

        在这个例子中,创建了一个 `DirectMessageListenerContainer` Bean,并配置了连接工厂、监听的队列名和消息监听器。

        SimpleMessageListenerContainer和DirectMessageListenerContainer区别

        1.基本概念                                                                                                                                         
        在Spring AMQP框架中,`SimpleMessageListenerContainer`和`DirectMessageListenerContainer`都是用于监听RabbitMQ消息队列,消费其中消息的容器。

        2.具体含义

        SimpleMessageListenerContainer:它在消费消息时会经过一些中间层处理。例如,它会使用一个线程池来管理消费者线程,并且在消息处理过程中会涉及一些额外的封装和调度逻辑。这些中间层虽然带来了更多的灵活性和功能,比如可以方便地配置并发消费者数量、消息确认模式等,但也增加了系统的开销。
        DirectMessageListenerContainer:它直接使用`Channel`(RabbitMQ中用于与消息代理进行通信的通道)进行消息的消费。这意味着它绕过了`SimpleMessageListenerContainer`中的一些中间层逻辑,减少了不必要的处理步骤和开销。由于减少了中间层,消息的处理流程更加直接,所以性能相对较高。

        3.额外的封装和调度的含义

        (1)封装

        消息处理封装:`SimpleMessageListenerContainer` 会对从队列接收到的原始消息进行一系列封装。例如,它会把从 RabbitMQ 拿到的字节数据封装成 Java 对象,方便开发者在业务代码里使用。这个过程涉及到数据的反序列化、类型转换等操作,这些额外的处理增加了系统开销。

        资源管理封装:它会管理连接、通道等资源,将这些资源的获取、使用和释放等操作封装起来,让开发者不用手动处理这些底层细节。比如,在获取 `Connection` 和 `Channel` 时,会有相应的逻辑保证资源的正确获取和使用,这也会带来一定的性能损耗。

        (2)调度

        线程调度:`SimpleMessageListenerContainer` 使用线程池来管理消费者线程。它需要调度这些线程来处理消息,包括线程的创建、销毁、任务分配等操作。例如,要根据配置的并发消费者数量,合理地将消息分配给不同的线程处理,这一调度过程会消耗一定的系统资源。

        消息确认调度:在处理消息确认时,它也有相应的调度逻辑。比如设置了手动确认模式,在消息处理完成后,需要调度线程向 RabbitMQ 发送确认消息,确保消息的正确处理和可靠性。

       4. DirectMessageListenerContainer` 基于的技术

`DirectMessageListenerContainer` 通常不依赖传统意义上像 `SimpleMessageListenerContainer` 那样的线程池来调度消费者。它是基于 Reactor 模式,利用 Netty 等异步 I/O 框架来实现的。

异步 I/O:借助异步 I/O 技术,`DirectMessageListenerContainer` 可以在一个线程中高效地处理多个 `Channel` 的消息,而不需要为每个 `Channel` 分配一个单独的线程。当有消息到达时,它会通过异步回调的方式通知相应的处理逻辑,避免了线程的频繁创建和销毁,减少了线程上下文切换的开销。

事件驱动:它采用事件驱动的方式来处理消息。通过监听 `Channel` 上的各种事件(如消息到达、连接关闭等),在事件发生时触发相应的处理逻辑。例如,当 `Channel` 上有新消息到达时,会触发消息消费的事件,然后直接调用相应的消息处理方法,使得消息处理更加高效和直接。

        5.在消息确认机制方面有什么区别

        SimpleMessageListenerContainer的消息确认机制
        `SimpleMessageListenerContainer` 提供了灵活多样的消息确认模式,能满足不同业务场景的需求,主要通过 `acknowledgeMode` 属性进行配置:
        - AUTO(自动确认):这是默认的确认模式。当消息被消费者接收到后,会自动向 RabbitMQ 发送确认信息,告知消息已成功消费。这种模式简单方便,但可能会出现消息丢失的情况,例如消费者在处理消息过程中发生异常崩溃,由于消息已自动确认,RabbitMQ 不会重新发送该消息。
        - MANUAL(手动确认):在手动确认模式下,消费者需要显式地调用 `Channel.basicAck()`、`Channel.basicNack()` 或 `Channel.basicReject()` 方法来向 RabbitMQ 发送确认信息。这为开发者提供了更多的控制权,可以根据业务逻辑灵活处理消息,确保消息的可靠消费。例如,当消费者成功处理消息后,调用 `basicAck` 确认消息;若处理失败,可以选择调用 `basicNack` 或 `basicReject` 让消息重新入队或丢弃。
        - NONE(无需确认):使用这种模式时,RabbitMQ 不会等待消费者的确认信息,直接将消息标记为已消费。这种模式适用于对消息可靠性要求不高的场景,能够提高消息处理的效率。

         DirectMessageListenerContainer的消息确认机制
        `DirectMessageListenerContainer` 同样支持消息确认,且确认模式与 `SimpleMessageListenerContainer` 类似,也是通过配置 `acknowledgeMode` 来实现:
        - 自动确认:消息一旦被接收,就会自动确认,无需消费者额外操作。
        - 手动确认:消费者需要手动调用相应的方法来确认消息。由于 `DirectMessageListenerContainer` 采用异步 I/O 和事件驱动机制,在手动确认时需要特别注意线程安全和消息处理的顺序。

`DirectMessageListenerContainer`直接使用`Channel`消费消息,减少了中间层开销,从而提高了性能。不过,`SimpleMessageListenerContainer`提供了更多的功能和配置选项,适合对灵活性要求较高的场景。

        使用 @RabbitListener 注解:

        除了手动创建监听容器 Bean,Spring AMQP 还提供了 `@RabbitListener` 注解,它会自动创建 `SimpleMessageListenerContainer` 或 `DirectMessageListenerContainer`。只需要在方法上添加该注解,并指定监听的队列即可。

import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class MyRabbitListener {    @RabbitListener(queues = \"yourQueueName\")    public void handleMessage(String message) {        System.out.println(\"Received message: \" + message);    }}

使用 `@RabbitListener` 注解更加简洁,Spring 会自动处理监听容器的创建和管理。

        那使用@RabbitListener根据什么来决定使用SimpleMessageListenerContainer还是DirectMessageListenerContainer

        当使用 `@RabbitListener` 注解时,选择使用 `SimpleMessageListenerContainer` 还是 `DirectMessageListenerContainer` 可以从以下几个方面来考虑:

        1.性能需求
        (1)高吞吐量场景:
         - 如果你有高吞吐量的消息处理需求,`DirectMessageListenerContainer` 是更好的选择。由于它直接使用 `Channel` 进行消息消费,减少了中间层的开销,采用异步 I/O 和事件驱动机制,能在一个线程中高效处理多个 `Channel` 的消息,避免了频繁的线程创建和销毁以及上下文切换,性能相对较高。
        (2)普通业务场景:
        - 如果业务场景对吞吐量要求不是特别高,`SimpleMessageListenerContainer` 提供的功能和灵活性更重要。它通过线程池管理消费者线程,有完善的消息处理封装和调度机制,适合大多数普通业务场景。比如一个小型企业的内部通知系统,消息量相对较少,对灵活性要求较高,使用 `SimpleMessageListenerContainer` 可以方便地进行各种配置。

        2. 资源使用
        (1)资源紧张环境:
     - 在资源紧张的环境下,如服务器内存和 CPU 资源有限,`DirectMessageListenerContainer` 更合适。因为它不依赖传统线程池,减少了线程数量和资源占用,能在有限的资源下高效运行。
        (2)资源充足环境:
     - 当服务器资源充足时,`SimpleMessageListenerContainer` 可以更好地发挥其功能。它虽然会消耗更多的系统资源,但提供了丰富的配置选项和强大的调度功能,能满足复杂业务的需求。

        3.业务复杂度
        (1)简单业务逻辑:
    - 对于业务逻辑简单、对消息处理流程要求不高的场景,`DirectMessageListenerContainer` 能以更简洁的方式实现消息消费。例如,只需要简单地将消息记录到日志文件的场景,直接使用 `Channel` 消费消息即可。
        (2)复杂业务逻辑:
    - 当业务逻辑复杂,需要对消息进行复杂的处理和转换,或者需要灵活配置消息确认模式、并发消费者数量等时,`SimpleMessageListenerContainer` 更具优势。比如在一个金融系统中,消息处理涉及到多个业务规则的验证和计算,需要使用 `SimpleMessageListenerContainer` 的灵活配置来满足需求。       

         4.代码配置
在 Spring 中,可以通过配置不同的 `RabbitListenerContainerFactory` 来决定 `@RabbitListener` 使用哪种容器:

import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory;import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitConfig {    // 配置 SimpleMessageListenerContainer 工厂    @Bean    public SimpleRabbitListenerContainerFactory simpleFactory(ConnectionFactory connectionFactory) {        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();        factory.setConnectionFactory(connectionFactory);        // 其他配置        return factory;    }    // 配置 DirectMessageListenerContainer 工厂    @Bean    public DirectRabbitListenerContainerFactory directFactory(ConnectionFactory connectionFactory) {        DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();        factory.setConnectionFactory(connectionFactory);        // 其他配置        return factory;    }}

然后在使用 `@RabbitListener` 时,可以通过 `containerFactory` 属性指定使用的工厂:

import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Service;@Servicepublic class MessageConsumer {    @RabbitListener(queues = \"testQueue\", containerFactory = \"simpleFactory\")    public void simpleConsume(String message) {        System.out.println(\"Simple consume: \" + message);    }    @RabbitListener(queues = \"testQueue\", containerFactory = \"directFactory\")    public void directConsume(String message) {        System.out.println(\"Direct consume: \" + message);    }}

通过上述配置,可以根据具体需求选择合适的容器来处理消息。

        5.默认使用SimpleRabbitListenerContainerFactory

        在 Spring AMQP 里,若没有特别指定 `containerFactory` 属性,`@RabbitListener` 默认使用 `SimpleRabbitListenerContainerFactory` 创建的 `SimpleMessageListenerContainer`。

        原因分析
        Spring AMQP 框架在初始化时,会自动配置一个 `SimpleRabbitListenerContainerFactory` 作为默认的工厂 bean。这是因为 `SimpleMessageListenerContainer` 功能较为全面,具备丰富的配置选项和完善的线程管理、消息处理机制,能适应大多数业务场景,为开发者提供便捷且可靠的消息消费方案。

        可以看到在使用SimpleMessageListenerContainer和DirectMessageListenerContainer时都有ConnectionFactory,那ConnectionFactory从哪来是什么       

        当使用Spring Boot集成消息队列(像RabbitMQ)时,Spring会自动配置`ConnectionFactory`。

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitMQConfig {    @Bean    public ConnectionFactory connectionFactory() {        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(\"localhost\");        connectionFactory.setUsername(\"guest\");        connectionFactory.setPassword(\"guest\");        return connectionFactory;    }}

这里的`ConnectionFactory`是Spring AMQP框架中的接口,`CachingConnectionFactory`是Spring提供的具体实现类。