> 技术文档 > Spring Boot 整合 Apache Flink 教程_flink springboot

Spring Boot 整合 Apache Flink 教程_flink springboot


精心整理了最新的面试资料和简历模板,有需要的可以自行获取

点击前往百度网盘获取
点击前往夸克网盘获取


Spring Boot 整合 Apache Flink 教程

一、背景与目标

Apache Flink 是一个高性能的分布式流处理框架,而Spring Boot提供了快速构建企业级应用的能力。整合二者可实现:

  1. 利用Spring Boot的依赖注入、配置管理等功能简化Flink作业开发
  2. 构建完整的微服务架构,将流处理嵌入Spring生态
  3. 实现动态作业提交与管理

二、环境准备

  • JDK 17+
  • Maven 3.8+
  • Spring Boot 3.1.5
  • Flink 1.17.2

三、创建项目 & 添加依赖

1. 创建Spring Boot项目

使用Spring Initializr生成基础项目,选择:

  • Maven
  • Spring Web(可选,用于创建REST接口)

2. 添加Flink依赖

<dependencies>  <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency>  <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.17.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>1.17.2</version> <scope>provided</scope> </dependency>  <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime</artifactId> <version>1.17.2</version> <scope>test</scope> </dependency></dependencies>

四、基础整合示例

1. 编写Flink流处理作业

// src/main/java/com/example/demo/flink/WordCountJob.javaimport org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;public class WordCountJob { public static void execute() throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = env.fromElements( \"Spring Boot整合Flink\", \"Flink实时流处理\", \"Spring生态集成\" ); DataStream<WordCount> counts = text .flatMap(new FlatMapFunction<String, WordCount>() { @Override public void flatMap(String value, Collector<WordCount> out) {  for (String word : value.split(\"\\\\s\")) { out.collect(new WordCount(word, 1L));  } } }) .keyBy(value -> value.word) .sum(\"count\"); counts.print(); env.execute(\"Spring Boot Flink Job\"); } public static class WordCount { public String word; public long count; public WordCount() {} public WordCount(String word, long count) { this.word = word; this.count = count; } @Override public String toString() { return word + \" : \" + count; } }}

2. 在Spring Boot中启动作业

// src/main/java/com/example/demo/DemoApplication.java@SpringBootApplicationpublic class DemoApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } @Override public void run(String... args) throws Exception { WordCountJob.execute(); // 启动Flink作业 }}

五、进阶整合 - 通过REST API动态提交作业

1. 创建Job提交服务

// src/main/java/com/example/demo/service/FlinkJobService.java@Servicepublic class FlinkJobService { public String submitWordCountJob(List<String> inputLines) { try { final StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = env.fromCollection(inputLines); // ...(同上WordCount逻辑) JobExecutionResult result = env.execute(); return \"JobID: \" + result.getJobID(); } catch (Exception e) { return \"Job Failed: \" + e.getMessage(); } }}

2. 创建REST控制器

// src/main/java/com/example/demo/controller/JobController.java@RestController@RequestMapping(\"/jobs\")public class JobController { @Autowired private FlinkJobService flinkJobService; @PostMapping(\"/wordcount\") public String submitWordCount(@RequestBody List<String> inputs) { return flinkJobService.submitWordCountJob(inputs); }}

六、关键配置说明

1. application.properties

# 设置Flink本地执行环境spring.flink.local.enabled=truespring.flink.job.name=SpringBootFlinkJob# 调整并行度(根据CPU核心数)spring.flink.parallelism=4

2. 解决依赖冲突

在pom.xml中排除冲突依赖:

<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.17.2</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions></dependency>

七、运行与验证

  1. 启动Spring Boot应用:
mvn spring-boot:run
  1. 调用API提交作业:
curl -X POST -H \"Content-Type: application/json\" \\-d \'[\"Hello Flink\", \"Spring Boot Integration\"]\' \\http://localhost:8080/jobs/wordcount
  1. 查看控制台输出:
Flink> Spring : 1Flink> Boot : 1Flink> Integration : 1...

八、生产环境注意事项

  1. 集群部署:将打包后的jar提交到Flink集群

    flink run -c com.example.demo.DemoApplication your-application.jar
  2. 状态管理:集成Flink State Backend(如RocksDB)

  3. 监控集成:通过Micrometer接入Spring Boot Actuator

  4. 资源隔离:使用YarnKubernetes部署模式


九、完整项目结构

src/├── main/│ ├── java/│ │ ├── com/example/demo/│ │ │ ├── DemoApplication.java│ │ │ ├── flink/│ │ │ │ └── WordCountJob.java│ │ │ ├── controller/│ │ │ ├── service/│ ├── resources/│ │ └── application.propertiespom.xml

通过以上步骤,即可实现Spring Boot与Apache Flink的深度整合。这种架构特别适合需要将实时流处理能力嵌入微服务体系的场景,如实时风控系统、IoT数据处理平台等。后续可扩展集成Kafka、HBase等大数据组件。