> 文档中心 > spring cloud stream kafka 集成

spring cloud stream kafka 集成


spring cloud stream kafka 集成 有完整demo

第一步:需要引入如下依赖:

 <dependency>     <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency>     <groupId>org.springframework.cloud</groupId>     <artifactId>spring-cloud-stream</artifactId>     <version>2.0.1.RELEASE</version> </dependency> <dependency>     <groupId>org.springframework.cloud</groupId>     <artifactId>spring-cloud-starter-stream-kafka</artifactId>     <version>2.0.1.RELEASE</version> </dependency> <!-- spring-integration 这个依赖必须加上, 不然系统启动会报找不到类文件 --> <dependency>     <groupId>org.springframework.integration</groupId>     <artifactId>spring-integration-kafka</artifactId>     <version>3.0.2.RELEASE</version>     <exclusions>  <exclusion>      <groupId>org.springframework.kafka</groupId>      <artifactId>spring-kafka</artifactId>  </exclusion>  <exclusion>      <groupId>org.springframework.integration</groupId>      <artifactId>spring-integration-core</artifactId>  </exclusion>     </exclusions> </dependency>

第二步:application.yml进行配置如下:

server:  port: 8080spring:  application:    name: dum-stream  cloud:    stream:      kafka: binder:   brokers: 192.168.1.202:9092   auto-create-topics: true      bindings: testStreamOut:   destination: test-stream   contentType: application/json testStreamInput:   destination: test-stream   contentType: application/json

第三步:springcloud-stream模块的代码编写,在该模块下定义一个StreamMessageService,如下:

import com.test.dum.stream.service.AppsStreams;import com.test.dum.stream.param.MessageStream;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Service;import javax.annotation.Resource;/ * 

* 消息发送统一入口 */@Servicepublic class StreamMessageService { @Resource private AppsStreams appsStreams; public boolean sendMessage(MessageStream messageStream) { boolean isSuccess = appsStreams.testStreamOut().send(MessageBuilder .withPayload(messageStream) .build()); return isSuccess; }}



第四步:消息配置类如下:

import org.springframework.cloud.stream.annotation.Input;import org.springframework.cloud.stream.annotation.Output;import org.springframework.messaging.MessageChannel;public interface AppsStreams {    / 执法系统消息统一管理 */    String TEST_STREAM_OUT = "testStreamOut";    String TEST_STREAM_INPUT = "testStreamInput";    /     * output=生产者     * @return     */    @Output(TEST_STREAM_OUT)    MessageChannel testStreamOut();    /     * 消费者     * @return     */    @Input(TEST_STREAM_INPUT)    MessageChannel testStreamInput();}

第五步:绑定消息通道:

import com.test.dum.stream.service.AppsStreams;import org.springframework.cloud.stream.annotation.EnableBinding;@EnableBinding(AppsStreams.class)public class StreamsConfig {}
--->第六步:消息消费:```cimport com.test.dum.stream.param.MessageStream;import com.test.dum.stream.service.AppsStreams;import lombok.extern.slf4j.Slf4j;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.annotation.StreamListener;@EnableBinding(value = AppsStreams.class)@Slf4jpublic class StreamMessageListener {    /     * 测试消息类型     *     * @param payload     */    @StreamListener(AppsStreams.TEST_STREAM_INPUT)    public void handleStreamCity(MessageStream payload) { log.info("消息接收: " + payload);    }}

项目demo

spring cloud stream kafka demo下载地址