> 技术文档 > Python:消息队列(RabbitMQ)应用开发实践_python消息队列

Python:消息队列(RabbitMQ)应用开发实践_python消息队列


文章目录

  • 前言
  • 一、名词解释
    • 1、核心概念
    • 2、交换器类型
  • 二、消息模式
    • 1、简单模式
    • 2、工作队列
      • 消息持久性
    • 3、发布订阅
    • 4、路由模式
    • 5、主题模式
      • 通配符规则
    • 6、RPC模式
      • RPC工作流程
  • 三、安装部署
    • 1、拉取镜像
    • 2、启动容器
    • 3、使用`docker-compose`
  • 四、应用测试
    • 1、安装依赖
    • 2、创建连接
      • 2.1、引入依赖
      • 2.2、创建连接参数
      • 2.3、建立连接
    • 3、生产消息
    • 4、消费消息
  • 总结

前言

  官网有云:RabbitMQ是一个可靠且成熟的消息传递和流媒体代理,易于在云环境、本地和本地计算机上部署。支持多种开放标准协议,包括 AMQP 1.0MQTT 5.0。提供了许多选项,定义消息如何从发布者发送到一个或多个使用者。路由、筛选、流式处理、联合身份验证等。通过确认消息传输和跨集群复制消息的能力,确保消息是安全的。

一、名词解释

1、核心概念

  • 生产者(Producer):发送消息到队列的应用程序。
  • 消费者(Consumer):从队列中接收消息的应用程序。
  • 交换器(Exchange):接收来自生产者的消息,并根据绑定规则将消息路由到相应的队列。
  • 队列(Queue):存储消息,等待消费者处理。
  • 绑定(Binding):定义交换器和队列之间的关系。
  • 头交换器(Headers Exchange):使用消息头属性进行路由,而不是路由键。

2、交换器类型

  • 直接交换器(Direct Exchange):根据消息的路由键精确匹配队列。
  • 主题交换器(Topic Exchange):根据模式匹配路由键,将消息路由到一个或多个队列。
  • 扇出交换器(Fanout Exchange):将消息广播到所有绑定的队列,不考虑路由键。

二、消息模式

  RabbitMQ提供6种消息模式:简单模式、工作队列、发布订阅、路由模式、主题模式和RPC模式。

1、简单模式

  最简单的队列。

Python:消息队列(RabbitMQ)应用开发实践_python消息队列

  • P:生产者,也就是要发送消息的程序。
channel.queue_declare(queue=queue_name)channel.basic_publish(exchange=\'\',routing_key=queue_name,body=\'Hello World!\')

  参数exchange不指定时,消息会发送的默认的Exchange,此时的routing_key必须是queue_name(队列名称)。

  • C:消费者:消息的接受者,会一直等待消息到来。
channel.basic_consume(queue=queue_name,auto_ack=True,on_message_callback=callback)

  参数 auto_ack=True 表示自动应答,消费者取出消息后,消息队列将删除此消息。

  • Queue:消息队列。可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

2、工作队列

  在Worker之间分配任务(竞争消费者模式)。两个消费端共同消费同一个队列中的消息,但是一个消息只能被一个消费者获取。

Python:消息队列(RabbitMQ)应用开发实践_python消息队列
  通过 BasicQos 方法设置prefetchCount = 1,如:channel.basic_qos(0, 1, False);。使得每个消费者在同一个时间点最多处理1个消息。prefetchCount只有在手动ackauto_ack=False)的情况下才生效,自动ack不生效。

消息持久性

  使用手动ackauto_ack=False)能够确保消息处理失败或者消费者崩溃时任务不会丢失,但是如果RabbitMQ服务器停止、退出或者崩溃时,队列中的消息将丢失。需要做两件事来确保消息不会丢失:

  • 首先,定义队列时指定队列是持久的(durable=True),如:
channel.queue_declare(queue=queue_name, durable=True)

  如果队列已经存在,则要先删除队列。RabbitMQ 不允许重新定义队列属性。

  • 将消息标记为持久化:

  通过发布消息时提供properties参数,指定delivery_mode=pika.DeliveryMode.Persistent,如:

channel.basic