> 文档中心 > 必学消息队列-RabbitMQ(上集)

必学消息队列-RabbitMQ(上集)


个人简介

作者是一个来自河源的大三在校生,以下笔记都是作者自学之路的一些浅薄经验,如有错误请指正,将来会不断的完善笔记,帮助更多的Java爱好者入门。

文章目录

    • 个人简介
    • RabbitMQ(上集)
      • 什么是RabbitMQ
        • MQ的特点
        • MQ的使用场景
        • 各种MQ对比
      • 安装RabbitMQ环境
      • RabbitMQ的5种模型(重点)
        • 导入依赖
        • 基本消息模型(hello world)
        • work queue模型
          • 未实现能者多劳机制
          • 实现了能者多劳机制
        • fanout模型(广播模型)性能最好
        • direct模型(直连)(默认)
        • topic模型(通配符)

RabbitMQ(上集)

什么是RabbitMQ

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

MQ的特点

  • MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。
  • MQ遵循了AMQP协议的具体实现和产品。

MQ的使用场景

  • 在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
  • 异步处理(常用)
  • 应用解耦(常用)
  • 流量削峰(常用)

各种MQ对比

在目前主流的消息队列中有(ActiveMQ,RocketMQ,RabbitMQ,kafka)

RabbitMQ在上面的各种消息队列中对于消息的保护是十分到位的(不会丢失消息),相对于kafka,虽然kafka性能十分强悍,在大数据中处理海量数据游刃有余,但是kafka容易丢失消息,而RabbitMQ虽然性能不及kafka,但是也不会很差,对于消息要求完整性很高的系统中用RabbitMQ十分好。

安装RabbitMQ环境

总教程:https://www.cnblogs.com/saryli/p/9729591.html

1.安装erlang

(1.)下载erlang

官网地址:https://www.erlang.org/

下载教程:https://www.cnblogs.com/minily/p/7398445.html

(2.)配置erlang环境

配置教程:https://blog.csdn.net/g6256613/article/details/80191402

需要配置环境变量

必学消息队列-RabbitMQ(上集)

必学消息队列-RabbitMQ(上集)

(3.)检查是否安装成功

打开cmd,输入erl,有输出说明成功

(4.)下载rabbitMQ

下载地址:https://www.cnblogs.com/saryli/p/9729591.html

。。。。。。。。。。。。省略,在总教程都有。

(5.)最后访问http://localhost:15672,如果访问成功,说明rabbitMQ安装成功

RabbitMQ的5种模型(重点)

导入依赖

 <dependency>     <groupId>com.rabbitmq</groupId>     <artifactId>amqp-client</artifactId>     <version>5.7.3</version> </dependency>

基本消息模型(hello world)

生产者

public class provider {    /**     * 最基本的消息队列模型     *     * 消息生产者     * @param args     */    public static void main(String[] args) throws IOException, TimeoutException { //1.先new一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); //定义指定rabbitmq配置的工厂 factory.setUsername("ems"); factory.setPassword("123456"); factory.setVirtualHost("/ems"); //虚拟主机 factory.setHost("127.0.0.1");  //rabbitMQ的主机名(ip) //2.通过连接工厂创建一个connection Connection connection = factory.newConnection(); //3.通过connection对象create一个channel通道,以后我们的操作就是channel Channel channel = connection.createChannel(); //4.声明队列,如果没有这个队列则会自动生成 /**  *   queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)  *   参数1:队列名字  *   参数2:队列是否持久化  *   参数3:是否排斥(也就是一个队列是否只能由一个消费者消费)  *   参数4:自动删除,当所有消费者消费完之后是否把队列删除  *   参数5:额外参数  */ channel.queueDeclare("hello",true,false,false,null); //5.发布消息 /**  * 参数1:交换机名称,空字符串代表使用默认交换机。。。。  * 参数2:路由键(在没有指定交换机的情况下(不包括空字符串),路由键是发送消息队列的名字  * 参数3:额外参数===通常用MessageProperties.PERSISTENT_TEXT_PLAIN,意思是发送的消息在没有消费完也能持久化  * *****参数4(最重要):发送的消息内容(要转换成byte类型)  */ channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"第一个RabbitMQ程序!!!".getBytes()); channel.close(); connection.close();    }}

消费者

public class comsumer {    /**     * 消息消费者     */    public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setUsername("ems"); connectionFactory.setPassword("123456"); connectionFactory.setVirtualHost("/ems"); connectionFactory.setHost("127.0.0.1"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //这里的配置参数一定要和生产者一模一样,不然会报错 channel.queueDeclare("hello",true,false,false,null); //进行消费 /**  * 参数1:队列名字  * 参数2:是否自动确认消息  * 参数3:通常用DefaultConsumer匿名内部类,实现handleDelivery接收消息  */ channel.basicConsume("hello",true,new DefaultConsumer(channel){     /**      *参数3:接收的消息      */     @Override     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("=======消费者取出消息===>"+new String(body));     } }); /**  * 消费者端最好不要关闭channel和connection,不然可能读取不到消息  */// channel.close();// connection.close();    }}

work queue模型

为什么会引入这么一个消息队列模型????

我们可以想象一下,如果按照第一个模型,点对点的,生产者发消息经过消息队列再到消费者,此时消费者只有1个,如果我们生产者发送60条消息,假设每条消息要1秒钟才能执行完,那么hello world模型就要60秒才能消费完所有消息,如果我们用workqueue模型呢,我们假如再引入一个消费者,也就是1个生产者发送60条信息到2个消费者,默认负载均衡,每个队列处理30条,而且还是异步处理,那么我们只需要30秒就处理好了,效率大大的提高

未实现能者多劳机制
public class provider {    /**     * 生产者     * ====workQueue模型     */    public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setUsername("ems"); connectionFactory.setPassword("123456"); connectionFactory.setVirtualHost("/ems"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //生产者声明了队列,消费者也都要声明 channel.queueDeclare("workqueue",true,false,false,null); //basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) for (int i = 0; i < 10; i++) {     channel.basicPublish("","workqueue",null,("hello=="+i+"").getBytes()); } channel.close(); connection.close();    }}
public class comsumer1 {    /**     * 消费者1     * @param args     */    public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/ems"); connectionFactory.setUsername("ems"); connectionFactory.setPassword("123456"); connectionFactory.setHost("127.0.0.1"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) channel.queueDeclare("workqueue",true,false,false,null); //basicConsume(String queue, boolean autoAck, Consumer callback) channel.basicConsume("workqueue",true,new DefaultConsumer(channel){     @Override     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("===comsumer1===>"+new String(body));     } });    }}
public class comsumer2 {    /**     * 消费者2     */    public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/ems"); connectionFactory.setHost("127.0.0.1"); connectionFactory.setUsername("ems"); connectionFactory.setPassword("123456"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("workqueue",true,false,false,null); channel.basicConsume("workqueue",true,new DefaultConsumer(channel){     @Override     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("===comsumer2===>"+new String(body));     } });    }}

输出结果:(默认是类似负载均衡的轮询算法)

===comsumer1===>hello==0===comsumer1===>hello==2===comsumer1===>hello==4===comsumer1===>hello==6===comsumer1===>hello==8
实现了能者多劳机制

要实现能者多劳,只需要在消费者修改几处代码即可

1. channel.basicQos(1);

2. channel.basicConsume(“workqueue”,false,new DefaultConsumer(channel)

3. channel.basicAck(envelope.getDeliveryTag(),false); //手动确认

public class comsumer1 {    /**     * 消费者1 能者多劳     * @param args     */    public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/ems"); connectionFactory.setUsername("ems"); connectionFactory.setPassword("123456"); connectionFactory.setHost("127.0.0.1"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //每次收到一条消息 channel.basicQos(1); //queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) channel.queueDeclare("workqueue",true,false,false,null); //basicConsume(String queue, boolean autoAck, Consumer callback) channel.basicConsume("workqueue",false,new DefaultConsumer(channel){ //第二个参数修改为false,取消自动avk     @Override     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("===comsumer1===>"+new String(body));  channel.basicAck(envelope.getDeliveryTag(),false); //手动确认     } });    }}
public class comsumer2 {    /**     * 消费者2 能者多劳     */    public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/ems"); connectionFactory.setHost("127.0.0.1"); connectionFactory.setUsername("ems"); connectionFactory.setPassword("123456"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //每次只能收一条消息 channel.basicQos(1); channel.queueDeclare("workqueue",true,false,false,null); channel.basicConsume("workqueue",false,new DefaultConsumer(channel){     @Override     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("===comsumer2===>"+new String(body));  channel.basicAck(envelope.getDeliveryTag(),false); //手动确认消息     } });    }}

输出结果:

===comsumer1===>hello==0===comsumer1===>hello==6===comsumer1===>hello==8

fanout模型(广播模型)性能最好

特点:凡是和这个fanout交换机绑定的临时队列,都能收到消息

public class provider {    /**     * fanout模型(广播模型)     *     */    public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/ems"); connectionFactory.setUsername("ems"); connectionFactory.setPassword("123456"); connectionFactory.setHost("127.0.0.1"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //生产者声明交换机==>exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map arguments) /**  * exchangeDeclare:  * 参数一:交换机名字  * 参数二:交换机类型:  * 有这几种类型:""   , "fanout" , "direct" ,  "topic"  * 参数三:交换机是否持久化。(重启rabbitmq服务如果交换机没有删除就是持久化)  * 参数四:是否自动删除  * 参数五:额外参数  */ channel.exchangeDeclare("hello_exchange_fanout","fanout",true,false,null); //这里不用声明消息队列,只需要声明交换机即可,消费者需要声明消息队列(临时队列) //basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) channel.basicPublish("hello_exchange_fanout","",null,"exchange_fanout".getBytes()); channel.close(); connection.close();    }}
public class comsumer1 {    /**     * fanout模型(广播模型)     *     */    public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/ems"); connectionFactory.setHost("127.0.0.1"); connectionFactory.setUsername("ems"); connectionFactory.setPassword("123456"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //声明交换机 /**  * exchangeDeclare:  * 参数一:交换机名字  * 参数二:交换机类型:  * 有这几种类型:""   , "fanout" , "direct" ,  "topic"  * 参数三:交换机是否持久化。(重启rabbitmq服务如果交换机没有删除就是持久化)  * 参数四:是否自动删除  * 参数五:额外参数  */ channel.exchangeDeclare("hello_exchange_fanout","fanout",true,false,null); //创建一个临时队列 String queueName = channel.queueDeclare().getQueue(); //把交换机和临时队列绑定在一起 //queueBind(String queue, String exchange, String routingKey) channel.queueBind(queueName,"hello_exchange_fanout",""); //然后就可以通信了 //basicConsume(String queue, boolean autoAck, Map arguments, Consumer callback) channel.basicConsume(queueName,true,new DefaultConsumer(channel){     @Override     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println(new String(body));     } });    }}
public class comsumer2 {    public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/ems"); connectionFactory.setHost("127.0.0.1"); connectionFactory.setUsername("ems"); connectionFactory.setPassword("123456"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("hello_exchange_fanout","fanout",true,false,null); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName,"hello_exchange_fanout",""); channel.basicConsume(queueName,true,new DefaultConsumer(channel){     @Override     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println(new String(body));     } });    }}

direct模型(直连)(默认)

特点:根据路由键直接匹配

fanout、direct、topic 交换机类型都是可以把同一条消息路由到多个消费者身上的。而hello world、work queue不行。work queue和hello world模型同一条消息只能路由到某一个消费者身上

public class provider {    /**     * direct模式(直连交换机)     * @param args     */    public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/ems"); connectionFactory.setHost("127.0.0.1"); connectionFactory.setUsername("ems"); connectionFactory.setPassword("123456"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("direct_exchange","direct",true,false,null); /**  * 参数2:路由键,如果消费者有符合的则可以接收消息  */ channel.basicPublish("direct_exchange","user_log", MessageProperties.PERSISTENT_TEXT_PLAIN,  "hello,direct".getBytes()); channel.close(); connection.close();    }}
public class comsumer1 {    public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/ems"); connectionFactory.setHost("127.0.0.1"); connectionFactory.setUsername("ems"); connectionFactory.setPassword("123456"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("direct_exchange","direct",true,false,null); String queueName = channel.queueDeclare().getQueue(); //可以绑定多个路由,只要符合一个就可以接收到消息 channel.queueBind(queueName,"direct_exchange","user_log");// channel.queueBind(queueName,"direct_exchange","user_money"); channel.basicConsume(queueName,true,new DefaultConsumer(channel){     @Override     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("comsumer1===>"+new String(body));     } });    }}
public class comsumer2 {    public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/ems"); connectionFactory.setHost("127.0.0.1"); connectionFactory.setUsername("ems"); connectionFactory.setPassword("123456"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("direct_exchange","direct",true,false,null); String queueName = channel.queueDeclare().getQueue(); //可以绑定多个路由,只要符合一个就可以接收到消息// channel.queueBind(queueName,"direct_exchange","user_log"); channel.queueBind(queueName,"direct_exchange","user_money"); channel.basicConsume(queueName,true,new DefaultConsumer(channel){     @Override     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("comsumer2===>"+new String(body));     } });    }}

topic模型(通配符)

特点:通配符(#号和*号),也可以不使用通配符。

public class provider {    /**     * topic模式     * topic和direct相比,基本差不多,只不过topic可以使用通配符进行匹配     * 在topic模式下,生产者发送的路由键是user.log.test,消费者可以用user.#或者#.log.test或者*.*.test 。。。等等来匹配     * #:代表一个或多个单词的占位符     * *:代表一个单词的占位符,如上面,user.*是匹配不了user.log.test的。。。。。     * 交换机性能:fanout>direct>topic     */    public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/ems"); connectionFactory.setHost("127.0.0.1"); connectionFactory.setUsername("ems"); connectionFactory.setPassword("123456"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("topic_exchange","topic",true,false,null); for (int i = 0; i < 10; i++) {     String msg="topic_hello_"+i;     channel.basicPublish("topic_exchange","log.order.money", MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes()); } channel.close(); connection.close();    }}
public class consumer1 {    public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/ems"); connectionFactory.setHost("127.0.0.1"); connectionFactory.setUsername("ems"); connectionFactory.setPassword("123456"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String queueName = channel.queueDeclare().getQueue(); channel.exchangeDeclare("topic_exchange","topic",true,false,null); channel.queueBind(queueName,"topic_exchange","log.order.money"); channel.basicConsume(queueName,true,new DefaultConsumer(channel){     @Override     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("consumer1==>"+new String(body));     } });    }}
public class consumer2 {    public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/ems"); connectionFactory.setHost("127.0.0.1"); connectionFactory.setUsername("ems"); connectionFactory.setPassword("123456"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String queue = channel.queueDeclare().getQueue(); channel.exchangeDeclare("topic_exchange","topic",true,false,null); /**  * log.#===>#代表后面可以有一个或多个。  * log,* ==>代表后面只能有一个,也就是类似log.xx 才能匹配上  */ channel.queueBind(queue,"topic_exchange","log.#"); channel.basicConsume(queue,true,new DefaultConsumer(channel){     @Override     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  System.out.println("consumer2====>"+new String(body));     } });    }}