> 文档中心 > RabbitMQ工作模式

RabbitMQ工作模式


✨ RabbitMQ工作模式

  • Work queues工作队列模式
    • 基本介绍
    • 编写代码
      • 抽取公共部分,写一个工具类
      • 生产者
      • 消费者1
      • 消费者2
    • 测试
  • 发布订阅模式
    • 订阅模式类型
    • 代码编写
      • 生产者
      • 消费者1
      • 消费者2
    • 测试
    • 发布订阅模式和工作队列模式的区别
  • Routing 路由模式
    • 基本介绍
    • 编写代码
      • 生产者
      • 消费者1
      • 消费者2
  • Topics 通配符模式
    • 基本介绍
    • 代码
      • 生产者
      • 消费者1
      • 消费者2

📃个人主页:不断前进的皮卡丘
🌞博客描述:梦想也许遥不可及,但重要的是追梦的过程,用博客记录自己的成长,记录自己一步一步向上攀登的印记
🔥个人专栏:消息中间件

Work queues工作队列模式

基本介绍

RabbitMQ工作模式

  • 多个消费者共同消费同一个队列中的消息,可以实现快速消费,避免消息积压,但是多个消费者消费队列的消息的时候,是互斥的,同一个消息只能被一个消费者消费,不可以被多个消费者消费。
  • 应用:对于一些任务比较多的情况,使用工作队列可以提高任务处理的速度

编写代码

抽取公共部分,写一个工具类

  • 我们知道像连接等操作,其实基本上代码都是一样的,每一次写重复的代码其实没有什么意义,我们可以写一个工具类来封装这些操作。
public class ConnectionUtil {    public static Connection getConnection() throws Exception { //定义连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置服务地址 factory.setHost("192.168.137.118"); //端口 factory.setPort(5672); //设置账号信息,用户名、密码、vhost factory.setVirtualHost("/"); factory.setUsername("admin"); factory.setPassword("123456"); // 通过工程获取连接 Connection connection = factory.newConnection(); return connection;    }

生产者

public class Producer {    static final String QUEUE_NAME = "work_queue";    public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null); for (int i = 1; i <= 10; i++) {     String body = i+"hello rabbitmq~~~";     channel.basicPublish("",QUEUE_NAME,null,body.getBytes()); } channel.close(); connection.close();    }}

RabbitMQ工作模式

消费者1

public class Consumer1 {    static final String QUEUE_NAME = "work_queue";    public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){     @Override     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("body:"+new String(body));     } }; channel.basicConsume(QUEUE_NAME,true,consumer);    }}

消费者2

代码和上面相同

测试

我们先启动两个消费者,然后再启动生产者发送消息,到IDEA的两个消费者对应的控制台查看是否竞争性的接收到消息。

发布订阅模式

订阅模式类型

RabbitMQ工作模式

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机
  • C:消费者,消息的接受者,会一直等待消息到来。
  • Queue:消息队列,接收消息、缓存消息。
  • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特定队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
  • 发布订阅模式
    • 每个消费者都监听自己的队列
    • 生产者把消息发送给broker,然后交换机把消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
  • 交换机只负责转发消息,并没有存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

代码编写

生产者

public class Producer {    public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); /*exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map arguments)参数: 1. exchange:交换机名称 2. type:交换机类型     DIRECT("direct"),:定向     FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。     TOPIC("topic"),通配符的方式     HEADERS("headers");参数匹配 3. durable:是否持久化 4. autoDelete:自动删除 5. internal:内部使用。 一般false 6. arguments:参数 */ String exchangeName = "test_fanout"; //5. 创建交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null); //6. 创建队列 String queue1Name = "test_fanout_queue1"; String queue2Name = "test_fanout_queue2"; channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); //7. 绑定队列和交换机 /* queueBind(String queue, String exchange, String routingKey) 参数:     1. queue:队列名称     2. exchange:交换机名称     3. routingKey:路由键,绑定规则  如果交换机的类型为fanout ,routingKey设置为""  */ channel.queueBind(queue1Name,exchangeName,""); channel.queueBind(queue2Name,exchangeName,""); String body = "日志信息:张三调用了findAll方法...日志级别:info..."; //8. 发送消息 channel.basicPublish(exchangeName,"",null,body.getBytes()); //9. 释放资源 channel.close(); connection.close();    }}

消费者1

public class Consumer1 {    public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue1Name = "test_fanout_queue1"; Consumer consumer = new DefaultConsumer(channel){     @Override     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("body:"+new String(body));  System.out.println("将日志信息打印到控制台.....");     } }; channel.basicConsume(queue1Name,true,consumer);    }}

消费者2

public class Consumer2 {    public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue2Name = "test_fanout_queue2"; Consumer consumer = new DefaultConsumer(channel){     @Override     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("body:"+new String(body));  System.out.println("将日志信息打印到控制台.....");     } }; channel.basicConsume(queue2Name,true,consumer);    }}

RabbitMQ工作模式

测试

启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;达到广播的效果。
在执行完测试代码后,其实到RabbitMQ的管理后台找到Exchanges选项卡,点击 fanout_exchange 的交换机,可以查看到如下的绑定:
RabbitMQ工作模式

发布订阅模式和工作队列模式的区别

  • 工作队列模式不需要定义交换机,而发布订阅模式需要定义交换机
  • 发布订阅模式需要设置队列和交换机的绑定,而工作队列模式不需要设置,事实上是因为工作队列模式会把队列绑定到默认的交换机
  • 发布订阅模式中,生产者向交换机发送消息;工作队列模式则是生产者向队列发送消息(底层使用默认交换机)

Routing 路由模式

基本介绍

RabbitMQ工作模式

  • P:生产者,向交换机发送消息的时候,会指定一个routing key
  • X:Exchange(交换机),接收生产者的消息,然后把消息传递给和routing key完全匹配的队列
  • C1:消费者,它所在队列指定了需要routing key为error的信息
  • C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

路由模式的特点

  • 队列和交换机的绑定是需要指定routing key的,不可以随意绑定
  • 消息的发送方向交换机发送消息的时候,也需要指定消息的routing key
  • 交换机不再把消息交给每一个绑定的队列,而是根据消息的routing key来进行判断,只有队列的routing key和消息的routing key完全一样才会接收到消息。

编写代码

生产者

public class Producer {    public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();String exchangeName = "test_direct";// 创建交换机channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null);// 创建队列String queue1Name = "test_direct_queue1";String queue2Name = "test_direct_queue2"; // 声明(创建)队列channel.queueDeclare(queue1Name,true,false,false,null);channel.queueDeclare(queue2Name,true,false,false,null);// 队列绑定交换机 // 队列1绑定error channel.queueBind(queue1Name,exchangeName,"error"); // 队列2绑定info error warning channel.queueBind(queue2Name,exchangeName,"info"); channel.queueBind(queue2Name,exchangeName,"error"); channel.queueBind(queue2Name,exchangeName,"warning"); String message = "日志信息:张三调用了delete方法.错误了,日志级别warning"; // 发送消息 channel.basicPublish(exchangeName,"warning",null,message.getBytes()); System.out.println(message); channel.close(); connection.close();    }}

运行RabbitMQ工作模式

RabbitMQ工作模式

消费者1

public class Consumer1 {    public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue1Name = "test_direct_queue1"; Consumer consumer = new DefaultConsumer(channel){     @Override     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("body:"+new String(body));  System.out.println("将日志信息打印到控制台.....");     } }; channel.basicConsume(queue1Name,true,consumer);    }}

消费者2

public class Consumer2 {    public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue2Name = "test_direct_queue2"; Consumer consumer = new DefaultConsumer(channel){     @Override     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("body:"+new String(body));  System.out.println("将日志信息存储到数据库.....");     } }; channel.basicConsume(queue2Name,true,consumer);    }}

Topics 通配符模式

基本介绍

  • Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符
  • Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
  • 通配符规则:
    • #:匹配0个或者多个词
    • *:刚好可以匹配一个词

RabbitMQ工作模式
RabbitMQ工作模式

代码

生产者

public class Producer {    public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_topic"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null); String queue1Name = "test_topic_queue1"; String queue2Name = "test_topic_queue2"; channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); // 绑定队列和交换机 /**  *  参数:      1. queue:队列名称      2. exchange:交换机名称      3. routingKey:路由键,绑定规则   如果交换机的类型为fanout ,routingKey设置为""  */ // routing key  系统的名称.日志的级别。 //需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库 channel.queueBind(queue1Name,exchangeName,"#.error"); channel.queueBind(queue1Name,exchangeName,"order.*"); channel.queueBind(queue2Name,exchangeName,"*.*"); String body = "日志信息:张三调用了findAll方法...日志级别:info..."; //发送消息goods.info,goods.error channel.basicPublish(exchangeName,"order.info",null,body.getBytes()); channel.close(); connection.close();    }}

RabbitMQ工作模式

消费者1

public class Consumer1 {    public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue1Name = "test_topic_queue1"; Consumer consumer = new DefaultConsumer(channel){     @Override     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("body:"+new String(body));     } }; channel.basicConsume(queue1Name,true,consumer);    }}

消费者2

public class Consumer2 {    public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue2Name = "test_topic_queue2"; Consumer consumer = new DefaultConsumer(channel){     @Override     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("body:"+new String(body));     } }; channel.basicConsume(queue2Name,true,consumer);    }}

杭州女装网