> 技术文档 > 基于Kafka实现动态监听topic功能_kafka主题监听

基于Kafka实现动态监听topic功能_kafka主题监听

生命无罪,健康万岁,我是laity。

我曾七次鄙视自己的灵魂:

第一次,当它本可进取时,却故作谦卑;

第二次,当它在空虚时,用爱欲来填充;

第三次,在困难和容易之间,它选择了容易;

第四次,它犯了错,却借由别人也会犯错来宽慰自己;

第五次,它自由软弱,却把它认为是生命的坚韧;

第六次,当它鄙夷一张丑恶的嘴脸时,却不知那正是自己面具中的一副;

第七次,它侧身于生活的污泥中,虽不甘心,却又畏首畏尾。

基于Kafka实现动态监听topic功能

业务场景:导条根据各家接口进行数据分发其中包含动态kafka-topic,各家通过监听topic实现获取数据从而实现后续业务。

实现逻辑

pom

yaml 方案1 接收的是String

 kafka: bootstrap-servers: youKafkaIp:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔 listener: type: batch consumer: enable-auto-commit: false value-deserializer: org.apache.kafka.common.serialization.StringDeserializer key-deserializer: org.apache.kafka.common.serialization.StringDeserializer auto-offset-reset: earliest group-id: consumer-sb producer: value-serializer: org.apache.kafka.common.serialization.StringSerializer key-serializer: org.apache.kafka.common.serialization.StringSerializer

yaml 方案2 接收的是Byte

 kafka: bootstrap-servers: youKafkaIp:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔 listener: type: batch consumer: enable-auto-commit: false value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer key-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer auto-offset-reset: earliest group-id: consumer-sb producer: value-serializer: org.apache.kafka.common.serialization.StringSerializer key-serializer: org.apache.kafka.common.serialization.StringSerializer

收消息CODE

KafkaConfig.java

import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.kafka.common.serialization.StringSerializer;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor;import org.springframework.kafka.annotation.KafkaListenerConfigurer;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;import org.springframework.kafka.core.ConsumerFactory;import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;import java.util.HashMap;import java.util.Map;/** * @author laity */@EnableKafka@Configurationpublic class KafkaConfig { // 解决 Could not create message listener - MessageHandlerMethodFactory not set TODO:WWS 不好使 /*@Bean public KafkaListenerAnnotationBeanPostProcessor kafkaListenerAnnotationBeanPostProcessor() { KafkaListenerAnnotationBeanPostProcessor processor = new KafkaListenerAnnotationBeanPostProcessor(); processor.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory()); return processor; }*/ @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> map = new HashMap<>(); map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, \"youKafkaIp:9092\"); map.put(ConsumerConfig.GROUP_ID_CONFIG, \"consumer-laity\"); map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); return new DefaultKafkaConsumerFactory<String, String>(map); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(5); // new DefaultMessageHandlerMethodFactory() return factory; } // implements KafkaListenerConfigurer + 解决 Could not create message listener - MessageHandlerMethodFactory not set /*@Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { registrar.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory()); }*/}

KafkaListenerController.java

package cn.iocoder.yudao.server.controller.admin.szbl;import cn.iocoder.yudao.framework.common.pojo.CommonResult;import cn.iocoder.yudao.server.controller.admin.szbl.common.config.kafka.MyComponent;import cn.iocoder.yudao.server.controller.admin.szbl.vo.InitSceneVO;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RequestBody;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import javax.annotation.security.PermitAll;/** * @author laity */@RestController@RequestMapping(\"/kafka\")public class KafkaListenerController { private final MyComponent component; public KafkaListenerController(MyComponent component) { this.component = component; } private String topic;// 用于接收导条分发数据接口 @PostMapping(\"/reception\") @PermitAll public CommonResult<Boolean> putAwayL(@RequestBody InitSceneVO vo) { // …… 业务逻辑 // 去执行 监听固定的topic component.startListening(vo.getGzTopicName()); return CommonResult.success(true); }}

DynamicKafkaListenerService.java

import org.springframework.aop.framework.Advised;import org.springframework.aop.support.AopUtils;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.config.KafkaListenerEndpointRegistry;import org.springframework.kafka.config.MethodKafkaListenerEndpoint;import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;import org.springframework.stereotype.Service;import java.lang.reflect.Method;import java.time.LocalDateTime;import java.util.Objects;/** * @author laity 动态管理Kafka监听器 */@Servicepublic class DynamicKafkaListenerService { private final KafkaListenerEndpointRegistry registry; private final ConcurrentKafkaListenerContainerFactory<String, String> factory; @Autowired public DynamicKafkaListenerService(KafkaListenerEndpointRegistry registry, ConcurrentKafkaListenerContainerFactory<String, String> factory) { this.registry = registry; this.factory = factory; } public void addListener(String topic, String groupId, Object bean, Method method) { if (AopUtils.isAopProxy(bean)) { try { bean = ((Advised) bean).getTargetSource().getTarget(); } catch (Exception e) { throw new RuntimeException(e); } } MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>(); assert bean != null; endpoint.setBean(bean); endpoint.setMethod(method); endpoint.setTopics(topic); endpoint.setGroup(groupId); endpoint.setId(method.getName() + \"_\" + LocalDateTime.now()); endpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory()); // 之前怎么点都点不出来这个属性 突然又出来了……无语 registry.registerListenerContainer(endpoint, factory, true); // 指定容器工厂 } public void removeListener(String beanName) { // 断言 Objects.requireNonNull(registry.getListenerContainer(beanName)).stop(); registry.unregisterListenerContainer(beanName); }}

BlueKafkaConsumer.java

import org.springframework.stereotype.Component;/** * @author laity */@Componentpublic class BlueKafkaConsumer { // @KafkaListener(topics = \"#{__listener.getTopicName()}\", groupId = \"consumer-laity\") public void listen(Object record) { System.out.println(\"======================= 接收动态KafkaTopics Received message ========================\"); System.out.println(record.toString()); }}

MyComponent.java

import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import java.lang.reflect.Method;/** * @author laity */@Componentpublic class MyComponent { private final DynamicKafkaListenerService kafkaListenerService; private final BlueKafkaConsumer blueKafkaConsumer; @Autowired public MyComponent(DynamicKafkaListenerService kafkaListenerService, BlueKafkaConsumer blueKafkaConsumer) { this.kafkaListenerService = kafkaListenerService; this.blueKafkaConsumer = blueKafkaConsumer; } public void startListening(String topic) { try { Method blueMethod = BlueKafkaConsumer.class.getMethod(\"listen\", Object.class); kafkaListenerService.addListener(topic, \"consumer-laity\", blueKafkaConsumer, blueMethod); } catch (NoSuchMethodException e) { throw new RuntimeException(e); } } public void stopListening(String beanName) { kafkaListenerService.removeListener(beanName); } // init @PostConstruct // 这个是服务启动时调用 但我想要的时实时可变的 public void init() { }}

世界上最可贵的两个词,一个叫认真,一个叫坚持,认真的人改变自己,坚持的人改变命运,有些事情不是看到了希望才去坚持,而是坚持了才有希望。我是Laity,正在前进的Laity。