Kafka如何实现延迟队列?_kafka延迟队列
Kafka并没有使⽤JDK⾃带的Timer或者DelayQueue来实现延迟的功能,⽽是基于时间轮⾃定义了⼀个⽤于实现延迟功能的定时器(SystemTimer)。
JDK的Timer和DelayQueue插⼊和删除操作的平均时间复杂度为O(nlog(n)),并不能满⾜Kafka的⾼性能要求,⽽基于时间轮可以将插⼊和删除操作的时间复杂度都降为O(1)。
时间轮的应⽤并⾮Kafka独有,其应⽤场景还有很多,在Netty、Akka、Quartz、Zookeeper等组件中都存在时间轮的踪影。
底层使⽤数组实现,数组中的每个元素可以存放⼀个TimerTaskList对象。
TimerTaskList是⼀个环形双向链表,在其中的链表项TimerTaskEntry中封装了 真正的定时任务TimerTask.Kafka中到底是怎么推进时间的呢?
Kafka中的定时器借助了JDK中的DelayQueue来协助推进时间轮。
具体做法是对于每个使⽤到的TimerTaskList都会加⼊到DelayQueue 中。
Kafka中的TimingWheel专⻔⽤来执⾏插⼊和删除TimerTaskEntry的操作,⽽DelayQueue专⻔负责时间推进的任务。
再试想⼀下,DelayQueue中的第⼀个超时任务列表的expiration为200ms,第⼆个超时任务为840ms,这⾥获取DelayQueue的队头只需要O(1)的时间复杂度。
如果采⽤每秒定时推进, 那么获取到第⼀个超时的任务列表时执⾏的200次推进中有199次属于“空推进”,⽽获取到第⼆个超时任 务时有需要执⾏639次“空推进”,这样会⽆故空耗机器的性能资源,这⾥采⽤DelayQueue来辅助以少量空间换时间,从⽽做到了“精准推进”。
Kafka中的定时器真可谓是“知⼈善⽤”,⽤TimingWheel做最擅⻓的任务添加和删除操作,⽽⽤DelayQueue做最擅⻓的时间推进⼯作,相辅相成。
⏳ Kafka 实现延迟队列的两种核心方案
由于 Kafka 本身不支持直接延迟投递,需通过 时间轮+外部存储 或 插件 实现。以下是详细实现方法及选型建议。
1. 🕒 方案一:TTL+死信队列(原生支持)
📌 实现原理
-
创建 延迟Topic(如
delay-5min
),设置消息TTL(5分钟过期) -
过期后自动转入 死信队列(实际消费的Topic)
🔧 步骤详解
# 1. 创建延迟Topic(设置5分钟TTL和死信路由)kafka-topics.sh --create \\ --topic delay-5min \\ --partitions 3 \\ --config retention.ms=300000 \\ --config cleanup.policy=delete \\ --config x-dead-letter-topic=real-orders \\ --bootstrap-server kafka1:9092# 2. 生产者发送延迟消息ProducerRecord record = new ProducerRecord( \"delay-5min\", null, // key=null走轮询分区 \"订单取消:123\");producer.send(record);# 3. 消费者订阅真实Topickafka-console-consumer.sh --topic real-orders --bootstrap-server kafka1:9092
✅ 优点
-
无需额外组件
-
适合简单延迟场景(如固定5分钟/10分钟)
❌ 缺点
-
延迟时间固定,无法动态指定
-
需要维护多个Topic
2. 🚀 方案二:Kafka+外部调度器(推荐)
📌 架构设计
🔧 实现步骤
-
生产者:将消息+延迟时间存入DB
// 消息结构示例{ \"id\": \"msg-123\", \"topic\": \"orders\", \"body\": \"订单取消:123\", \"deliver_time\": \"2023-07-20T15:30:00Z\", // ISO8601时间 \"status\": \"PENDING\"}
-
调度器:
-
定时扫描DB(
WHERE deliver_time <= NOW()
) -
将到期的消息写入Kafka目标Topic
-
-
消费者:正常消费目标Topic
✅ 优点
-
支持任意精确延迟(秒级控制)
-
可查询/修改延迟任务
❌ 缺点
-
需引入额外存储和调度服务
3. 🔌 方案三:Kafka插件(非官方)
📌 使用 kafka-delayed-queue
插件
# 安装插件(需重启Broker)cp kafka-delayed-queue.jar /kafka/libs/
配置延迟Topic
# server.propertiesdelayed.queue.enable=truedelayed.queue.topic=delayed-orders
发送延迟消息
// 设置延迟头信息record.headers().add(\"x-delay-ms\", \"300000\".getBytes()); // 5分钟producer.send(record);
✅ 优点
-
原生延迟支持
-
无需额外存储
❌ 缺点
-
社区插件,生产环境需验证稳定性
4. ⚖️ 方案对比决策表
5. 🛠️ 生产建议:电商订单超时案例
# 伪代码:延迟30分钟关闭未支付订单def produce_delayed_message(order_id): # 计算延迟时间 deliver_time = datetime.now() + timedelta(minutes=30) # 存入MongoDB db.delayed_messages.insert_one({ \"order_id\": order_id, \"topic\": \"order-timeout\", \"body\": f\"超时关闭:{order_id}\", \"deliver_time\": deliver_time })# 调度器每分钟扫描一次while True: messages = db.delayed_messages.find({\"deliver_time\": {\"$lte\": datetime.now()}}) for msg in messages: kafka.produce(msg[\"topic\"], msg[\"body\"]) db.delete(msg[\"_id\"]) sleep(60)
📌 终极总结
-
简单需求 → 用TTL+死信队列(低成本)
-
精准延迟 → Kafka+外部调度器(推荐方案)
-
技术尝鲜 → 尝试社区插件
Kafka实现延迟队列如同 快递定时配送:
-
TTL方案:像快递柜,固定时间后取件
-
调度器方案:像专人预约配送,精准控制时间
-
插件方案:像智能快递机器人,尚在试验阶段
根据业务需求选择合适方案! ⏱️🚚