> 文档中心 > RabbitMQ 中的简单通讯,以及其四种交换机的使用

RabbitMQ 中的简单通讯,以及其四种交换机的使用

目录

    • 一个简单的通讯
      • I. 依赖以及配置
      • II. 在 RabbitMQ 主页面中发送消息
      • III. 在 IDEA 中发送消息
    • 四种交换机
      • I. 发布/订阅模式
      • II. Direct
      • III. Fanout
      • IV. Topic
      • V. Header

一个简单的通讯

I. 依赖以及配置

  1. 创建一个 SpringBoot 工程,在创建的时候勾选以下两个依赖
    RabbitMQ 中的简单通讯,以及其四种交换机的使用

  2. 然后在 application.properties 中添加 RabbitMQ 的基本信息

    # RabbitMQ 的端口号spring.rabbitmq.port=5672# 虚拟机的地址spring.rabbitmq.host=192.168.73.129# 虚拟主机spring.rabbitmq.virtual-host=/spring.rabbitmq.username=guestspring.rabbitmq.password=guest

II. 在 RabbitMQ 主页面中发送消息

  1. 创建一个 config 配置文件,在里面配置队列信息

    @Configurationpublic class RabbitConfig {    //定义一个队列的名字    public static final String MY_QUEUE_NAME = "my_queue_name";    @Bean    Queue queue01() { /  * 参数一:队列的名字  * 参数二:队列是否持久化。即当 rabbitMQ 关机重启后是否还存在  * 参数三:排他性。即哪个连接创建该队列,哪个连接才能操作该队列,一般都是 false  * 参数四:是否自动删除。即如果没有人监听该队列,是否自动删除该队列  */ return new Queue(MY_QUEUE_NAME, true, false, false);    }}
  2. 创建消费者,监听 RabbitMQ 中的 Queue(队列)中的消息

    @Componentpublic class MyConsumer {    /     * 监听某一个队列     * @param msg     */    @RabbitListener(queues = RabbitConfig.MY_QUEUE_NAME)    public void handleMsg(String msg) { System.out.println("handleMsg = " + msg);    }}
  3. 然后启动,这时候会发现在 RabbitMQ 主页中的 Connections 中会监听到一个连接
    RabbitMQ 中的简单通讯,以及其四种交换机的使用

  4. 那么怎么通讯呢?很简单,在 Queues 中找到当前自己的队列,即上面自己定义的队列的名字。
    RabbitMQ 中的简单通讯,以及其四种交换机的使用

  5. 点进来之后,在这里就可以发送消息了
    RabbitMQ 中的简单通讯,以及其四种交换机的使用

  6. 然后在 IDEA 的启动日志中就可以看到这样一段信息
    RabbitMQ 中的简单通讯,以及其四种交换机的使用

III. 在 IDEA 中发送消息

  1. 通讯不仅可以在 RabbitMQ 的主页面上通过点击实现消息的发送,在 IDEA 中也可以通过代码来实现

  2. 代码还是上面一样不用变,此时在单元测试中注入 RabbitTemplate ,通过 RabbitTemplate 的方法来实现消息发送

    @AutowiredRabbitTemplate rabbitTemplate;@Testpublic void test01(){ / * 第一个参数是队列的名字 * * 第二个参数是消息内容 */rabbitTemplate.convertAndSend(RabbitConfig.MY_QUEUE_NAME, "Hello World!");}
  3. 在启动类上就可以看到消息的内容
    RabbitMQ 中的简单通讯,以及其四种交换机的使用

四种交换机

I. 发布/订阅模式

  1. 有这样一种情况:一个生产者,多个消费者,每一个消费者都有自己的队列,生产者没有将消息直接发送到队列,而是发送到了交换机。每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。
  2. 需要注意的是,如果将消息发送到一个没有队列绑定的 Exchange 上,那么该消息将会丢失,这是因为在 RabbitMQ 中 Exchange 不具备存储消息的能力,只有队列具备存储消息的能力
  3. 在这种情况下,我们有四种交换机交换机可供选择
    • Direct
    • Fanout
    • Topic
    • Header

II. Direct

  1. Direct 交换机也叫直连交换机,DirectExchange 的路由策略是将 消息队列 绑定到一个 DirectExchange上。

  2. 当一条 消息队列 到达 DirectExchange 时会被转发到与该消息 routing key 相同的 Queue(队列)上

  3. 下面开始配置

    (1) 创建一个 config 配置类,在上面进行 Queue、Exchange 等配置

    / * 直连交换机 */@Configurationpublic class DirectConfig {//交换机的名称    public static final String MY_DIRECT_EXCHANGE_NAME = "my_direct_exchange_name";    //队列一    public static final String MY_DIRECT_QUEUE_NAME_01 = "my_direct_queue_name_01";    //队列二    public static final String MY_DIRECT_QUEUE_NAME_02 = "my_direct_queue_name_02";    @Bean    Queue directQueue01() { return new Queue(MY_DIRECT_QUEUE_NAME_01, true, false, false);    }    @Bean    Queue directQueue02() { return new Queue(MY_DIRECT_QUEUE_NAME_02, true, false, false);    }    @Bean    DirectExchange directExchange() { /  * 参数一:交换机的名字  * 参数二:是否具备持久性  * 参数三:是否自动删除  */ return new DirectExchange(MY_DIRECT_EXCHANGE_NAME, true, false);    }    /     * 这个 Bean 的作用就是把交换机与队列连接起来     * @return     */    @Bean    Binding birectBinding01() { return BindingBuilder  //指定要绑定的队列  .bind(directQueue01())  //指定交换机  .to(directExchange())  //要传一个 routingKey,因为自定义容易出错,所以这里就用队列名作为 routingKey  .with(MY_DIRECT_QUEUE_NAME_01);    }    @Bean    Binding birectBinding02() { return BindingBuilder  //指定要绑定的队列  .bind(directQueue02())  //指定交换机  .to(directExchange())  //这里的 routingKey 是自定义的,但我这里避免出错,所以直接就用队列名作为 routingKey 了  .with(MY_DIRECT_QUEUE_NAME_02); }}
    • 首先提供了两个队列(多少个队列都行,我这里是为了与后面几个交换机做比较),然后创建了一个 DirectExchange 对象,该对象即交换机
  • 创建一个 Binding 对象,将 Exchange 和 Queue 绑定在一起

    • 大家会发现,这个案例与前面的在“在IDEA中发送消息”十分相似,其实如果使用的是直连交换机(DirectExchange)的话,DirectExchange 和 Binding 两个 Bean 的配置可以省略,即如果使用 Direct 交换机,可以只配置 Queue 即可。
  • 因为在默认的情况下,使用的就是 Direct 交换机。

    (2) 消费者,在上面配置监听的队列

    @Componentpublic class DirectConsumer {    /     * 监听名为 MY_DIRECT_QUEUE_NAME_01 的队列     * @param msg     */    @RabbitListener(queues = DirectConfig.MY_DIRECT_QUEUE_NAME_01)    public void handleMsg01(String msg) { System.out.println("handleMsg01 = " + msg);    }    /     * 监听名为 MY_DIRECT_QUEUE_NAME_02 的队列     * @param msg     */    @RabbitListener(queues = DirectConfig.MY_DIRECT_QUEUE_NAME_02)    public void handleMsg02(String msg) { System.out.println("handleMsg02 = " + msg); }}
      (3) 与前面的类似,也是在单元测试中注入 RabbitTemplate 进行测试      ```java   @Autowired   RabbitTemplate rabbitTemplate;      @Test   public void test04(){   rabbitTemplate.convertAndSend(DirectConfig.MY_DIRECT_EXCHANGE_NAME,DirectConfig.MY_DIRECT_QUEUE_NAME_02,"Hello Queue02");   }      /    * 参数一:交换机的名字    * 参数二:routing key,我这里设置的是队列的名字    * 参数三:需要发送的消息    */   @Test   public void test03(){    rabbitTemplate.convertAndSend(DirectConfig.MY_DIRECT_EXCHANGE_NAME,DirectConfig.MY_DIRECT_QUEUE_NAME_01,"Hello Queue01");   }

(4) 分别运行就可以在启动类的日志中看到效果了
RabbitMQ 中的简单通讯,以及其四种交换机的使用
RabbitMQ 中的简单通讯,以及其四种交换机的使用

III. Fanout

  1. FanoutExchange — 扇形交换机。FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息都转发给与它绑定的 Queue 上。

  2. 在这种策略中,routingKey 将不起任何作用。

  3. 下面开始配置:

    (1) 与上面的 Direct 交换机也是类似的操作,都是先创建一个 config 去进行配置

    / * 扇形交换机 */@Configurationpublic class FanoutConfig {    public static final String MY_FANOUT_EXCHANGE_NAME = "my_fanout_exchange_name";    public static final String MY_FANOUT_QUEUE_NAME_01 = "my_fanout_queue_name_01";    public static final String MY_FANOUT_QUEUE_NAME_02 = "my_fanout_queue_name_02";    @Bean    Queue fanoutQueue01() { return new Queue(MY_FANOUT_QUEUE_NAME_01, true, false, false);    }    @Bean    Queue fanoutQueue02() { return new Queue(MY_FANOUT_QUEUE_NAME_02, true, false, false);    }    @Bean    FanoutExchange fanoutExchange() { return new FanoutExchange(MY_FANOUT_EXCHANGE_NAME, true, false);    }/     * 注意,扇形交换机这里是与别的交换机不同的     * @return     */    @Bean    Binding fanoutBinding01() { return BindingBuilder  .bind(fanoutQueue01())  .to(fanoutExchange());    }    @Bean    Binding fanoutBinding02() { return BindingBuilder  .bind(fanoutQueue02())  .to(fanoutExchange());    }}

    (2) 其实步骤与 Direct 差不多,只是在队列与交换机绑定那里不一样而已。

    (3) 消费者,在上面配置监听的队列

    @Componentpublic class FanoutConsumer {    @RabbitListener(queues = FanoutConfig.MY_FANOUT_QUEUE_NAME_01)    public void handleMsg01(String msg) { System.out.println("handleMsg01 = " + msg);    }    @RabbitListener(queues = FanoutConfig.MY_FANOUT_QUEUE_NAME_02)    public void handleMsg02(String msg) { System.out.println("handleMsg02 = " + msg);    }}

    (4) 单元测试

    @AutowiredRabbitTemplate rabbitTemplate;@Testpublic void test05(){    rabbitTemplate.convertAndSend(FanoutConfig.MY_FANOUT_EXCHANGE_NAME,null,"Hello Fanout!");}

    (5) 在启动类的日志中看到效果,与 Fanout 交换机连接的队列都接受到消息
    RabbitMQ 中的简单通讯,以及其四种交换机的使用

IV. Topic

  1. TopicExchange — 主题交换机。TopicExchange 是比较复杂但是也比较灵活的一种路由策略

  2. 在 TopicExchange 中,Queue 通过绑定 routingKey 连接到 TopicExchange 上,当消息到达 TopicExchange 后,TopicExchange 根据消息中的 routingKey 将消息路由到一个或者多个 Queue 上。

  3. 说白了,就是可以在 TopicExchange 中灵活的配置一些自定义内容

  4. 下面开始配置

    (1) 创建一个 config 进行配置

    / * 主题交换机 */@Configurationpublic class TopicConfig {    public static final String XIAOMI_QUEUE = "xiaomi_queue";    public static final String HUAWEI_QUEUE = "huawei_queue";    public static final String PHONE_QUEUE = "phone_queue";    public static final String TOPIC_EXCHAGNE_NAME = "topic_exchagne_name";    @Bean    Queue xiaomiQueue() { return new Queue(XIAOMI_QUEUE, true, false, false);    }    @Bean    Queue huaweiQueue() { return new Queue(HUAWEI_QUEUE, true, false, false);    }    @Bean    Queue phoneQueue() { return new Queue(PHONE_QUEUE, true, false, false);    }    @Bean    TopicExchange topicExchange() { return new TopicExchange(TOPIC_EXCHAGNE_NAME, true, false);    }    @Bean    Binding huaweiBinding() { return BindingBuilder  .bind(huaweiQueue())  .to(topicExchange())  //与其他交换机不一样的是,这里是可以设置通配符"#"的,即 huawei.xxx 都将转发到 huaweiQueue() 这个方法所对应的队列中  .with("huawei.#");    }    @Bean    Binding xiaomiBinding() { return BindingBuilder  .bind(xiaomiQueue())  .to(topicExchange())  .with("xiaomi.#");    }    @Bean    Binding phoneBinding() { return BindingBuilder  .bind(phoneQueue())  .to(topicExchange())  //这个即 xxx.phone.xxx 都将转发到 phoneQueue() 这个方法对应的队列中  .with("#.phone.#");    }}

    (2) TopicExchange 主题交换机是可以设置 # 通配符的,所以可以对 routingKey 进行自定义

    (3) 消费者,在上面配置监听的队列

    @Componentpublic class TopicConsumer {    @RabbitListener(queues = TopicConfig.XIAOMI_QUEUE)    public void xiaomi(String msg) { System.out.println("xiaomi = " + msg);    }    @RabbitListener(queues = TopicConfig.HUAWEI_QUEUE)    public void huawei(String msg) { System.out.println("huawei = " + msg);    }    @RabbitListener(queues = TopicConfig.PHONE_QUEUE)    public void phone(String msg) { System.out.println("phone = " + msg);    }}

    (4) 单元测试

    @AutowiredRabbitTemplate rabbitTemplate;@Testpublic void test06(){    rabbitTemplate.convertAndSend(TopicConfig.TOPIC_EXCHAGNE_NAME,"xiaomi.news","小米新闻");rabbitTemplate.convertAndSend(TopicConfig.TOPIC_EXCHAGNE_NAME,"huawei.news","华为新闻");rabbitTemplate.convertAndSend(TopicConfig.TOPIC_EXCHAGNE_NAME,"xiaomi.phone.news","小米手机新闻");}

    (5) 运行效果
    RabbitMQ 中的简单通讯,以及其四种交换机的使用

V. Header

  1. Header 交换机。这是一种使用比较少的路由策略,HeaderExchange 会根据消息的 Header 不用,从而将消息发送到不同的 Queue 上。

  2. 这种策略也和 routingKey 无关

  3. 下面开始配置

    (1) 创建一个 config 进行配置

    / * Header 交换机 */@Configurationpublic class HeaderConfig {    public static final String MY_HEADER_EXCHANGE_NAME = "my_header_exchange_name";    public static final String MY_HEADER_QUEUE_NAME_01 = "my_header_queue_name_01";    public static final String MY_HEADER_QUEUE_NAME_02 = "my_header_queue_name_02";    @Bean    Queue headerQueue01() { return new Queue(MY_HEADER_QUEUE_NAME_01, true, false, false);    }    @Bean    Queue headerQueue02() { return new Queue(MY_HEADER_QUEUE_NAME_02, true, false, false);    }    @Bean    HeadersExchange headersExchange() { return new HeadersExchange(MY_HEADER_EXCHANGE_NAME, true, false);    }    @Bean    Binding headerBinding01() { return BindingBuilder  .bind(headerQueue01())  .to(headersExchange())  //如果发送来的消息中,消息头有一个 name 字段,就将这个消息转发到 headerQueue01() 方法对应的队列中  .where("name").exists();    }    @Bean    Binding headerBinding02() { return BindingBuilder  .bind(headerQueue02())  .to(headersExchange())  //如果发送来的消息中,消息头有一个 age 字段,且值为 18,就将这个消息转发到 headerQueue02() 方法对应的队列中  .where("age").matches(18);    }}

    (2) Header 交换机没有 routingKey。继续配置消费者

    @Componentpublic class HeaderConsumer {    /     * 这里不就不能用 字符串 来接收了     * 因为这里获取的是一个 Header ,即消息头,所以是没有办法自动帮我解析成字符串的     * 所以呢,这里只能使用一个 byte[] 来接收     * 一会儿传值的时候就通过一个 message 对象来设置消息头等信息     * @param msg     */    @RabbitListener(queues = HeaderConfig.MY_HEADER_QUEUE_NAME_01)    public void handleMsg01(byte[] msg) { System.out.println("handleMsg01 " + new String(msg, 0, msg.length));    }    @RabbitListener(queues = HeaderConfig.MY_HEADER_QUEUE_NAME_02)    public void handleMsg02(byte[] msg) { System.out.println("handleMsg02 " + new String(msg, 0, msg.length));    }}

    (3) 注意的是,Header 交换机是通过判断消息头中的字段来觉得是否转发到队列的,所以这里还不能用字符串来接收,因为没法解析。这里使用 byte[] ,然后手动转成字符串

    (4) 单元测试中测试。不同的是,这里是通过 messageBuilder 来构建一个 message 对象来进行传值的。与上面几种的纯文本传值不一样

    @AutowiredRabbitTemplate rabbitTemplate;@Testpublic void test07(){Message msg01 = MessageBuilder//字符串转成 byte[].withBody("hello header!".getBytes())//设置请求头,转换器就是通过这里的值来判断的.setHeader("name", "peng").build();Message msg02 = MessageBuilder.withBody("hello header age".getBytes()).setHeader("age", 99).build();Message msg03 = MessageBuilder.withBody("hello header name with age".getBytes()).setHeader("name", "libai").setHeader("age", 99).build();rabbitTemplate.send(HeaderConfig.MY_HEADER_EXCHANGE_NAME,null,msg01);rabbitTemplate.send(HeaderConfig.MY_HEADER_EXCHANGE_NAME,null,msg02);rabbitTemplate.send(HeaderConfig.MY_HEADER_EXCHANGE_NAME,null,msg03);}

    (5) 运行效果
    RabbitMQ 中的简单通讯,以及其四种交换机的使用