Spring Kafka进阶:实现多态消息消费_spring kafka多线程消费
Spring Kafka 官方提供了一种基于消息类型的消费模式,通过类上的@KafkaListener注解配合方法上的@KafkaHandler,实现多态消息的自动路由处理。其典型实现方式如下:
@KafkaListener(id = \"multi\", topics = \"myTopic\")static class MultiListenerBean { @KafkaHandler public void listen(String foo) { ... } @KafkaHandler public void listen(Integer bar) { ... } @KafkaHandler(isDefault = true) public void listenDefault(Object object) { ... }}
根据 Spring Kafka 官方文档 @KafkaListener on a Class 的说明:
When messages are delivered, the converted message payload type is used to determine which method to call. 接收到消息后,会使用转换后的消息类型来决定调用哪个方法。
开发者可以基于此特性,轻松实现从简单文本消息到复杂领域事件的各种消息处理场景。
一、实现多态消息消费
第一步:环境准备,引入必要依赖
首先在项目的 pom.xml 中添加 Spring Kafka 的依赖。
org.springframework.kafka spring-kafka ${spring-kafka-version} com.alibaba fastjson ${fastjson-version}
第二步:消息模型设计,建立统一消息基类
设计一个抽象消息基类作为所有消息类型的父类,包含公共字段和基础方法:
/** * 消息抽象基类,所有具体消息类型都应继承此类 */public abstract class AbstractMessage { /** 消息id */ private String messageId; public String getMessageId() { return messageId; } public void setMessageId(String messageId) { this.messageId = messageId; }}
第三步:实现智能反序列化器
import org.apache.kafka.common.serialization.Deserializer;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.parser.Feature;public class MessageDeserializer implements Deserializer { @Override public AbstractMessage deserialize(String topic, byte[] data) { // 1. 字节数组转字符串 String jsonData = new String(data); // 2. 使用Fastjson的自动类型识别功能 Object object = JSON.parse(jsonData, Feature.SupportAutoType); if (object instanceof AbstractMessage) { return (AbstractMessage) object; } return null; }}
第四步:配置消费者工厂
创建 Spring Kafka 配置类,设置消费者相关参数:
@Configuration@EnableKafkapublic class Config { /** 消费者工厂 */ @Bean public ConsumerFactory consumerFactory() { Map configs = new HashMap(); configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, \"localhost:9092\"); configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 自定义的反序列化实现类 configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MessageDeserializer.class); return new DefaultKafkaConsumerFactory(configs); } /** 监听容器工厂 */ @Bean ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFactory consumerFactory) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory); factory.setConcurrency(1); // 设置消费者线程数 return factory; }}
第五步:实现消息监听
创建消息监听服务,利用 @KafkaHandler 实现多态消息处理:
@Service@KafkaListener(id = \"orderService\", topics = \"order.topic\")public class OrderMessageListener { @KafkaHandler public void handle(OrderCreateEvent event) { // 业务处理逻辑 System.out.println(\"收到 OrderCreateEvent 消息\"); } @KafkaHandler public void handle(OrderCancelEvent event) { // 业务处理逻辑 System.out.println(\"收到 OrderCancelEvent 消息\"); }}
最后,我们往 topic=order.topic 中发一条消息:
{ \"@type\": \"io.github.open.easykafka.event.OrderCreateEvent\", \"id\": \"67ff910f020000010001b064\"}
消费方控制台打印如下:
收到 OrderCreateEvent 消息
至此,我们实现了多态消息的消费模式。同时也引发了我们的思考:Spring Kafka 是如何精准找到正确的方法的?
二、实现原理
下面我们就来深入剖析 Spring Kafka 框架实现这一机制的核心原理。
为便于阅读和理解,以下 Spring Kafka 源码部分均省略和简化了无关代码。
2.1 注解扫描
Spring Kafka 会对所有被 @KafkaListener 标记的类和方法进行扫描,在本文示例中,@KafkaListener 标注在了类上,扫描逻辑如下:
public class KafkaListenerAnnotationBeanPostProcessor implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton { @Override public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException { // 查找被 @KafkaListener 标记的类 Collection classLevelListeners = findListenerAnnotations(targetClass); // 查找被 @KafkaHandler 标记的方法 final List multiMethods = new ArrayList(); Set methodsWithHandler = MethodIntrospector.selectMethods(targetClass, (ReflectionUtils.MethodFilter) method -> AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null); multiMethods.addAll(methodsWithHandler); // 处理每个找到的监听方法 processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName); return bean; }}
2.2 端点注册
Spring Kafka 会根据 @KafkaListener 注解位置自动识别并创建不同类型的端点:
public class KafkaListenerAnnotationBeanPostProcessor implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton { private void processMultiMethodListeners(Collection classLevelListeners, List multiMethods, Object bean, String beanName) { for (KafkaListener classLevelListener : classLevelListeners) { // @KafkaListener 标注在类上会被描述为 MultiMethodKafkaListenerEndpoint MultiMethodKafkaListenerEndpoint endpoint = new MultiMethodKafkaListenerEndpoint(checkedMethods, defaultMethod, bean); processListener(endpoint, classLevelListener, bean, bean.getClass(), beanName); } }}
@KafkaListener 标注在类上会被 Spring Kafka 描述为 MultiMethodKafkaListenerEndpoint,它是 MethodKafkaListenerEndpoint 的子类。这些端点(KafkaListenerEndpoint)最终会被注册到注册表 KafkaListenerEndpointRegistry 的 List 集合中。
2.3 消息监听器实例化
Spring Kafka 会将注册表 KafkaListenerEndpointRegistry 中的端点 KafkaListenerEndpoint 实例化成消息监听器 MessageListener 对象。
public abstract class AbstractKafkaListenerEndpoint implements KafkaListenerEndpoint, BeanFactoryAware, InitializingBean { private void setupMessageListener(MessageListenerContainer container, MessageConverter messageConverter) { // 创建消息监听适配器 MessagingMessageListenerAdapter adapter = createMessageListener(container, messageConverter); Object messageListener = adapter; container.setupMessageListener(messageListener); }}
创建消息监听适配器的过程如下。
public class MethodKafkaListenerEndpoint extends AbstractKafkaListenerEndpoint { @Override protected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container, MessageConverter messageConverter) { // 创建消息监听器实例 MessagingMessageListenerAdapter messageListener = createMessageListenerInstance(messageConverter); return messageListener; } protected MessagingMessageListenerAdapter createMessageListenerInstance(MessageConverter messageConverter) { // 最终的消息监听器是 RecordMessagingMessageListenerAdapter RecordMessagingMessageListenerAdapter messageListener = new RecordMessagingMessageListenerAdapter( this.bean, this.method, this.errorHandler); return messageListener; }}
可以看到,最终的消息监听器是 RecordMessagingMessageListenerAdapter,它是接口 MessageListener 的实现类。
2.4 启动消息监听
在《深入Spring Kafka:消费者是如何创建的?》这篇文章中,我们描述了启动消息监听的详细过程。一个 @KafkaListener 对应一个 MessageListenerContainer,最终初始化后的消息监听器容器如下图所示:
在②中可见,ContainerProperties 中已经包含了处理消息的两个 handle 方法。
2.5 消息消费过程
基于前文分析,示例中的 OrderMessageListener 消费者最终会被实例化成 RecordMessagingMessageListenerAdapter。下面我们来看看该适配器收到消息后做了哪些处理。
public class RecordMessagingMessageListenerAdapter extends MessagingMessageListenerAdapter implements AcknowledgingConsumerAwareMessageListener { @Override public void onMessage(ConsumerRecord record, Acknowledgment acknowledgment, Consumer consumer) { Message message = toMessagingMessage(record, acknowledgment, consumer); // 反射调用业务方法 invokeHandler(record, acknowledgment, message, consumer); }}
反射调用业务方法过程如下。
public class DelegatingInvocableHandler { public Object invoke(Message message, Object... providedArgs) throws Exception { // 这里拿到的是 OrderCreateEvent.class Class payloadClass = message.getPayload().getClass(); // 通过 OrderCreateEvent.class 找 handle 方法 Object result = InvocableHandlerMethod handler = getHandlerForPayload(payloadClass); // 反射调用 handle 方法 handler.invoke(message, providedArgs) return new InvocationResult(result, replyTo, this.handlerReturnsMessage.get(handler)); }}
Spring Kafka 是如何通过 OrderCreateEvent.class 找到对应的 handle 方法的?
public class DelegatingInvocableHandler { protected InvocableHandlerMethod getHandlerForPayload(Class payloadClass) { // 通过 OrderCreateEvent.class 找 handle 方法 InvocableHandlerMethod handler = findHandlerForPayload(payloadClass); return handler; } protected InvocableHandlerMethod findHandlerForPayload(Class payloadClass) { InvocableHandlerMethod result = null; // 属性 handlers 里面有两个 Method 对象: // handle(OrderCreateEvent.class), handle(OrderCancelEvent.Class) for (InvocableHandlerMethod handler : this.handlers) { // 通过 OrderCreateEvent.class 找对应的 handle 方法 if (matchHandlerMethod(payloadClass, handler)) { result = handler; } } return result != null ? result : this.defaultHandler; } protected boolean matchHandlerMethod(Class payloadClass, InvocableHandlerMethod handler) { // 读取 Method handle(OrderCreateEvent.class) 对象的方法参数 Annotation[][] parameterAnnotations = handler.getMethod().getParameterAnnotations(); MethodParameter methodParameter = new MethodParameter(handler.getMethod(), 0); // 判断类型是否匹配 if (methodParameter.getParameterType().isAssignableFrom(payloadClass)) { return true; } return foundCandidate != null; }}
至此,Spring Kafka 便精准找到了 handle(OrderCreateEvent.class) 方法。
三、总结
上文所提到的代码调用链路如下:
RecordMessagingMessageListenerAdapter#onMessage() [捕获异常ListenerExecutionFailedException] [执行KafkaListenerErrorHandler]-> MessagingMessageListenerAdapter#toMessagingMessage() [反序列化]-> JsonMessageConverter#extractAndConvertValue() [反序列化]-> MessagingMessageListenerAdapter#invokeHandler() [反射调用]-> HandlerAdapter#invoke() [反射调用]-> DelegatingInvocableHandler#invoke() [反射调用] -> getHandlerForPayload() [查找调用方法] -> findHandlerForPayload() [查找调用方法]-> InvocableHandlerMethod#doInvoke [spring-message.jar] [反射调用]-> MessagingMessageListenerAdapter#handleResult [处理方法返回结果]-> KafkaMessageListenerContainer.ListenerConsumer#ackCurrent [ACK]