> 技术文档 > 基于Kafka实现简单的延时队列

基于Kafka实现简单的延时队列

生命无罪,健康万岁,我是laity。

我曾七次鄙视自己的灵魂:

第一次,当它本可进取时,却故作谦卑;

第二次,当它在空虚时,用爱欲来填充;

第三次,在困难和容易之间,它选择了容易;

第四次,它犯了错,却借由别人也会犯错来宽慰自己;

第五次,它自由软弱,却把它认为是生命的坚韧;

第六次,当它鄙夷一张丑恶的嘴脸时,却不知那正是自己面具中的一副;

第七次,它侧身于生活的污泥中,虽不甘心,却又畏首畏尾。

基于Kafka实现简单的延时队列

业务场景:
listener kafka 中的指定topic,接收并处理其中的message,再基于websocket向前端推送数据,前端接收到数据后将数据放置到定时队列中,进行5s的倒计时
,情况1:时间到了进行触发下一步的接口(该操作为自动操作)
,情况2:时间未到有人为干预进行点击进入下一步的接口(该操作为人工操作)。
问题:当前端页面进行切换页面后,前端是无法将定时队列中的数据进行存储,从而进行清空;
解决方案:kafka的延时队列解决切换页面后未处理的message;
1.解决流程:
(1)listener 到 message 并处理后直接进行走一遍自动操作,
(2)并将存入库中的saveId进行返回至websocket推向前端的JSON数据中,
(3)再通过写好的send方法将JSONObject发送至kafka的消息队列中,
(4)之后不论人工还是自动都进行update操作(基于saveId可以去查询到’自动处理’时的留痕,判断update_time的是否存在,不在则进行update(因为触发update只有当前端时间到了或kafka延时队列的listener))。

实现流程

1.引入pom依赖

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <optional>true</optional></dependency>

2.yaml进行自动配置

spring: application: # 应用名称 name: youServerName kafka: bootstrap-servers: youKafkaIp:9092 consumer: enable-auto-commit: false value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer key-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer

3.各各类的实现

DelayEnum.java

import lombok.AllArgsConstructor;import lombok.Getter;/** * @author laity */@Getter@AllArgsConstructorpublic enum DelayEnum { FIVE_S(5, \"topic_message_5s\"), TEN_S(10, \"topic_message_10s\"); private final int delay_time; private final String topic_name;}

KafkaDelayMsg.java

import lombok.Data;import java.util.concurrent.Delayed;import java.util.concurrent.TimeUnit;/** * @author laity */@Datapublic class KafkaDelayMsg implements Delayed { private String msg; // content - 可以换成对应的实体 private DelayEnum delayEnum; private long time; public KafkaDelayMsg() { } public KafkaDelayMsg(String msg, DelayEnum delayEnum) { this.msg = msg; this.delayEnum = delayEnum; this.time = System.currentTimeMillis() + delayEnum.getDelay_time() * 1000; } /* 延时队列实现的关键 */ @Override public long getDelay(TimeUnit unit) { return unit.convert(time - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { return Long.compare(this.time, ((KafkaDelayMsg) o).time); }}

KafkaUtil.java

import lombok.extern.slf4j.Slf4j;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.common.serialization.StringSerializer;import org.springframework.context.annotation.Bean;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.core.*;import java.util.HashMap;import java.util.Map;/** * @author laity */@Slf4jpublic class KafkaUtil { KafkaProducer<String, String> kafkaProducer = null; private static final Map<String, Object> map = new HashMap<>(); public KafkaUtil() { map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, \"192.168.x.x:9092\"); map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); } // send public void send(KafkaDelayMsg msg) { KafkaProducer producer = new KafkaProducer(map); if (msg.getDelayEnum().getDelay_time() == DelayEnum.FIVE_S.getDelay_time()) { KafkaDelayQueue.FIVE_S.add(msg); } KafkaDelayQueue.run(producer); }}

KafkaDelayQueue.java

import lombok.extern.slf4j.Slf4j;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import java.util.concurrent.DelayQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/** * @author laity 设计队列 */@Slf4jpublic class KafkaDelayQueue { public static DelayQueue<KafkaDelayMsg> FIVE_S = new DelayQueue<>(); // 实际sendMsg的function public static void run(KafkaProducer<String, String> producer) { ExecutorService executorService = Executors.newFixedThreadPool(5); executorService.execute(() -> { log.info(\"=============== 开始推送执行 ==================\"); while (true) { try {  KafkaDelayMsg take = FIVE_S.take();  // 推送数据  RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(take.getDelayEnum().getTopic_name(), take.getMsg())).get();  log.info(\"============================= CONTENT:\" + take.toString() + \"-\" + recordMetadata.topic() + \"-\" + recordMetadata.partition() + \" ===============================\"); } catch (Exception e) {  e.printStackTrace(); } } }); }}

KafkaConsumer.java - 业务代码

import com.fasterxml.jackson.databind.ObjectMapper;import lombok.RequiredArgsConstructor;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;import java.io.IOException;import java.util.Date;/** * @author laity 消费 */@Component@RequiredArgsConstructor(onConstructor_ = {@Autowired})public class KafkaConsumer { private final KelCjTaskResultMapper mapper; // beanRef的功能 public String getTopicName() { return DelayEnum.FIVE_S.getTopic_name(); } // @KafkaListener(topics = \"send_message_5s\", groupId = \"consumer-w-1\") @KafkaListener(topics = \"#{__listener.getTopicName()}\", groupId = \"consumer-w-1\") public void listen(String message) { // 1.将message进行处理转换 KelCjJudgeRule kelCjJudgeRule = new KelCjJudgeRule(); ObjectMapper objectMapper = new ObjectMapper(); try { kelCjJudgeRule = objectMapper.readValue(message, KelCjJudgeRule.class); } catch (IOException e) { throw new RuntimeException(\"Failed to deserialize JSON: \" + e.getMessage(), e); } // 2.getId用于判断updateTime是否具有 - 情况 yes 说明手动处理过 no 未手动处理过 - pass or update if (kelCjJudgeRule.getId() != null) { // cjTaskId KelCjTaskResult kelCjTaskResult = mapper.selectById(kelCjJudgeRule.getId()); if (kelCjTaskResult != null) { if (kelCjTaskResult.getUpdateTime() == null) {  kelCjTaskResult.setUpdateTime(new Date());  kelCjTaskResult.setCjResultTime(new Date());  int i = mapper.updateKelCjTaskResult(kelCjTaskResult); } else { } } } }}

总结

年轻人,你的职责是平整土地,而非焦虑时光。你做三四月的事,在八九月自有答案。我是Laity,正在前进的Laity。