> 文档中心 > RabbitMQ的搭建和操作

RabbitMQ的搭建和操作

创建channel (信道)

ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.1.104");factory.setUsername("root");factory.setPassword("123456");Connection connection = factory.newConnection();Channel channel = connection.createChannel();

声明路由 参数(路由名,路由类型)

//路由类型 //BuiltinExchangeType.FANOUT/"fanout"   扇出 广播到所有已绑定队列//BuiltinExchangeType.DIRECT/"direct"   直接 //BuiltinExchangeType.TOPIC/"topic"     主题//BuiltinExchangeType.HEADERS/"headers" 标题channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

声明队列 参数(队列名,是否持久化,是否排他,自动删除,null)

//队列名可以自己起名也可以随机 String queueName=UUID.randomUUID().toString();channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, arguments:null);//也可以直接产生随机队列,断开连接时,队列自动删除String queueName = channel.queueDeclare().getQueue();//预取值int prefetchCount=n; 队列只能存n个未处理消息channel.basicQos(prefetchCount);//

队列绑定 参数(队列名,路由名,路由Key/绑定Key)

//对应扇出路由(fanout) routingKey/BindingKey为空字符串channel.queueBind(queueName, EXCHANGE_NAME, "");//对应直接路由(direct) routingKey/BindingKey为自定义的字符串 与basicPublish中第二个参数(队列名/路由绑定Key)对应 一个队列可以有多个绑定 哪个绑定与basicPublish相同发哪个 一个队列有多个相同的绑定,消息只接收一次channel.queueBind(queueName, EXCHANGE_NAME, "自定义的字符串1");channel.queueBind(queueName, EXCHANGE_NAME, "自定义的字符串2");//对应扇出路由(topic) routingKey/BindingKey是一个不超过255字节的单词列表,以点号分隔开 *(星号)可以代替一个单词 #(井号)可以替代零个或多个单词 例如*.abc.*  bcd.# 只要某个basicPublish中队列名或者绑定Key符合单词列表的规则,就从这个队列发送消息(basicPublish也可以用*和#)channel.queueBind(queueName, EXCHANGE_NAME, "单词列表");

发布者发布消息 参数(路由名,队列名/路由绑定Key,消息属性,消息.getBytes(“编码方式”))

//路由名可以问空串"",使用默认路由//消息属性:①MessageProperties.PERSISTENT_TEXT_PLAIN消息持久化②null不作配置channel.basicPublish(EXCHANGE_NAME,QUEUE_NAME,null, message.getBytes("UTF-8"));//发布确认//单个确认发布 每次发布都等待发布成功  特点:慢 稳channel.confirmSelect();for(){channel.basicPublish();boolean flag=channel.waitForConfirms();    if(flag) {System.out.println("成功");}}//批量确认发布 按批确认一次 特点:快一点 出错了不知道错的是哪个,要整批重发int batchSize=n;channel.confirmSelect();for(){    channel.basicPublish();    count++;    if(count%batchSize==0){ boolean flag=channel.waitForConfirms(); if(flag){System.out.println("成功");}    }  }//异步确认发布 发布和确认是分开的 可以边发布边确认 可能刚确认一部分就全部发布了 发布的消息是一个键值对,键是消息序号,值是消息内容 哪个发布错误就返回哪个的序号重新发布 

消费者接收消息 参数(队列名,是否自动应答,deliverCallback,cancelCallback)

DeliverCallback deliverCallback=(consumerTag,delivery)->{};CancelCallback cancelCallback=(message)->{};channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
boolean autoAck=false
//consumerTag是Consumer客户端的标识符//如果没有在Consumer客户端设置consumerTag,服务端会自动生成一个唯一的consumerTag//consumerTag必须保持唯一,即已被某个Consumer使用的consumerTag不可同时被另一个Consumer使用//consumerTag在同一个channel内有效,即已在某个channel内被创建的不可在另一个channel内被使用//同一会话consumerTag是固定的可以做此会话的名字,deliveryTag 每次接收消息+1,可以做此消息处理通道的名字。因此deliveryTag 可以用来回传告诉 rabbitmq 这个消息处理成功 清除此消息(basicAck方法)DeliverCallback deliverCallback=(consumerTag,delivery)->{    //  消息回调    String message=new String(delivery.getbody(),"UTF-8"); 得到String类型消息本体    System.out.println("接收到消息:"+message);    //  手动应答 消息重新入队    boolean multiple=false(只应答当前消息)/true(批量应答,应答当前消息和之前的所有未应答消息)//①肯定应答Channel.basicAck(delivery.getEnvelope().getDeliveryTag(),multiple)已成功处理该消息,可以将其丢弃了//②否定应答Channel.basicNack(delivery.getEnvelope().getDeliveryTag(),multiple)不处理该消息了可以将其丢弃了 //③否定应答Channel.basicReject(delivery.getEnvelope().getDeliveryTag())拒绝处理该消息可以将其丢弃了//  delivery.getEnvelope().getDeliveryTag() delivery->得到信封->得到传递序号(DeliveryTag)    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),multiple);//将收到的消息保存到磁盘    File file = new File("C:\\work\\rabbitmq_info.txt");    FileUtils.writeStringToFile(file,message,"UTF-8"); delivery.getEnvelope().getRoutingKey():得到信息的路由key也就是绑定key(BindingKey);    };
CancelCallback cancelCallback=(consumerTag)->{System.out.println(consumerTag+"消费者取消消费接口回调逻辑");};

死信队列 队列声明最后一个参数Map arguments

//arguments.put("x-message-ttl",10000); 过期时间一般不在队列设置 因为是对整个队列的消息产生作用 相比较下在发布时声明过期时间更灵活arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);arguments.put("x-dead-letter-routing-key","lisi");

Python学习手册