spring cloud stream kafka 集成
spring cloud stream kafka 集成 有完整demo
第一步:需要引入如下依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> <version>2.0.1.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> <version>2.0.1.RELEASE</version> </dependency> <!-- spring-integration 这个依赖必须加上, 不然系统启动会报找不到类文件 --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-kafka</artifactId> <version>3.0.2.RELEASE</version> <exclusions> <exclusion> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </exclusion> <exclusion> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> </exclusion> </exclusions> </dependency>
第二步:application.yml进行配置如下:
server: port: 8080spring: application: name: dum-stream cloud: stream: kafka: binder: brokers: 192.168.1.202:9092 auto-create-topics: true bindings: testStreamOut: destination: test-stream contentType: application/json testStreamInput: destination: test-stream contentType: application/json
第三步:springcloud-stream模块的代码编写,在该模块下定义一个StreamMessageService,如下:
import com.test.dum.stream.service.AppsStreams;import com.test.dum.stream.param.MessageStream;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Service;import javax.annotation.Resource;/ * * 消息发送统一入口 */
@Servicepublic class StreamMessageService { @Resource private AppsStreams appsStreams; public boolean sendMessage(MessageStream messageStream) { boolean isSuccess = appsStreams.testStreamOut().send(MessageBuilder .withPayload(messageStream) .build()); return isSuccess; }}
第四步:消息配置类如下:
import org.springframework.cloud.stream.annotation.Input;import org.springframework.cloud.stream.annotation.Output;import org.springframework.messaging.MessageChannel;public interface AppsStreams { / 执法系统消息统一管理 */ String TEST_STREAM_OUT = "testStreamOut"; String TEST_STREAM_INPUT = "testStreamInput"; / * output=生产者 * @return */ @Output(TEST_STREAM_OUT) MessageChannel testStreamOut(); / * 消费者 * @return */ @Input(TEST_STREAM_INPUT) MessageChannel testStreamInput();}
第五步:绑定消息通道:
import com.test.dum.stream.service.AppsStreams;import org.springframework.cloud.stream.annotation.EnableBinding;@EnableBinding(AppsStreams.class)public class StreamsConfig {}
--->第六步:消息消费:```cimport com.test.dum.stream.param.MessageStream;import com.test.dum.stream.service.AppsStreams;import lombok.extern.slf4j.Slf4j;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.annotation.StreamListener;@EnableBinding(value = AppsStreams.class)@Slf4jpublic class StreamMessageListener { / * 测试消息类型 * * @param payload */ @StreamListener(AppsStreams.TEST_STREAM_INPUT) public void handleStreamCity(MessageStream payload) { log.info("消息接收: " + payload); }}
项目demo
spring cloud stream kafka demo下载地址