Flink实战教程从入门到精通(基础篇)(二)Flink快速上手_flink菜鸟教程
目录
前言:
一、环境准备
二、创建项目
1.创建工程
2、添加项目依赖
三、WordCount代码编写(有界流)
1、批处理和流处理
2、数据准备
3、编写代码
1、DataSet API (不推荐)(批处理)
2、DataStreaming(流处理)
总结:
四、WordCount代码编写(无界流)
1、代码编写
2、测试数据
Flink其他文章请点击:
Flink实战教程从入门到精通(基础篇)(一)Flink简介-CSDN博客
Flink实战教程从入门到精通(基础篇)(二)Flink快速上手_flink菜鸟教程-CSDN博客
Flink实战教程从入门到精通(基础篇)(三)Flink集群部署_flink集群搭建步骤-CSDN博客
Flink实战教程从入门到精通(基础篇)(四)Flink部署-Standalone运行模式_flink教程菜鸟教程-CSDN博客
Hadoop集群搭建(hdfs、yarn)_hadoop yarn 集群-CSDN博客
Flink实战教程从入门到精通(基础篇)(五)Flink部署-YARN运行模式_flink 1.17.1 yarn模式执行都需要哪些jar包-CSDN博客 Flink实战教程从入门到精通(基础篇)(六)Flink运行时架构_flink教程-CSDN博客
前言:
对 Flink有了基本的了解后,接下来就要理论联系实际,真正上手写代码了。Flink底层是以Java编写的,并为开发人员同时提供了完整的Java和ScalaAPI。在本书中,代码示例将全部用Java 实现;而在具体项目应用中,可以根据需要选择合适语言的API进行开发。在这一章,我们将会以大家最熟悉的 IntelliJIDEA 作为开发工具,用实际项目中最常见的Maven 作为包管理工具,在开发环境中编写一个简单的 Flink项目,实现零基础快速上手。
一、环境准备
1、小编的本地测试系统环境为 Windows 11。
2、需提前安装 Java 8。
3、集成开发环境(IDE)使用IntelliJIDEA,具体的安装流程参见IntelliJ官网。
4、安装 IntelliJIDEA之后,还需要安装一些插件--Maven和Git。Maven 用来管理项目依赖;通过Git 可以轻松获取我们的示例代码,并进行本地代码的版本控制。
二、创建项目
1.创建工程
打开IntellJIDEA,创建一个 Maven 工程,如图 1-1 所示
图:1-1
导入Maven仓库,File-> Settings->Maven,如图 1-2 所示
图:1-2
2、添加项目依赖
在项目的 pom 文件中,增加标签设置属性,然后增加标签引入需要的依赖。我们需要添加的依赖最重要的就是Flink的相关组件,包括fink-java、flink-streaming-java,以及 ink-clients(客户端,也可以省略)。另外,为了方便查看运行日志,我们引入 slf4i 和 log4i 进行日志管理。
将下面依赖导入刚新建项目的pom.xml中
1.17.0 org.apache.flink flink-streaming-java ${flink.version} org.apache.flink flink-clients ${flink.version}
如下图 1-3:
图 1-3
三、WordCount代码编写(有界流)
需求:统计一段文字中,每个单词出现的频次。
环境准备:在 src/main/java目录下,新建一个包,命名为cn.konne.wc。
1、批处理和流处理
批处理基本思路:先逐行读入文件数据,然后将每一行文字拆分成单词;接着按照单词分组,统计每组数据的个数,就是对应单词的频次。
流处理基本思路:一条一条的读取数据,每读到一条数据后计算结果,保存本地内存的状态,下一条数据进来后,从内存中读取之前状态,并进行计算。
2、数据准备
在工程根目录下新建一个input 文件夹,并在下面创建文本文件 words.txte。
在words.txt中输入一些文字,例如:
hello flinkhello worldehello iava
3、编写代码
在cn.konne.wc下面新建WordCountTest类,在类中编写main方法。并且将待会需要操作的每个步骤进行梳理,见图1-4:
图 1-4
具体代码如下:
1、DataSet API (不推荐)(批处理)
package cn.konne.wc;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.operators.AggregateOperator;import org.apache.flink.api.java.operators.DataSource;import org.apache.flink.api.java.operators.FlatMapOperator;import org.apache.flink.api.java.operators.UnsortedGrouping;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.util.Collector;/** * @author MJ * @date 2025/3/19 */public class WordCountTest { public static void main(String[] args) throws Exception { // TODO 1.创建执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // TODO 2.读取数据:从文件中读取 DataSource lonDs = env.readTextFile(\"/input/words.txt\"); // TODO 3.切分、转换(word,1) FlatMapOperator<String, Tuple2> wordAndOne = lonDs.flatMap(new FlatMapFunction<String, Tuple2>() { @Override public void flatMap(String value, Collector<Tuple2> out) throws Exception { // TODO 按照空格 切分单词、 String[] words = value.split(\" \"); // TODO 将单词转为为(word,1) for (String word : words) { Tuple2 stringIntegerTuple2 = Tuple2.of(word, 1); // TODO 使用 Collector 向下游发送消息 out.collect(stringIntegerTuple2); } } }); // TODO 4.按照 word 分组 UnsortedGrouping<Tuple2> tuple2UnsortedGrouping = wordAndOne.groupBy(0); // TODO 5.各分组内聚台 AggregateOperator<Tuple2> sum = tuple2UnsortedGrouping.sum(1); // TODO 6.输出/ sum.print(); }}
输出结构如下:
(java,1)(flink,1)(world,1)(he11o,3)
需要注意的是,这种代码的实现方式,是基于DataSetAPI的,也就是我们对数据的处理转换,是看作数据集来进行操作的。事实上Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现。所以从Flink1.12开始,官方推荐的做法是直接使用 DataStreamAP,在提交任务时通过将执行模式设为 BATCH来进行批处理:←S bin/flink run -Dexecution,runtime-mode=BATCH BatchWordCount.jar。
这样,DataSet API就没什么用了,在实际应用中我们只要维护一套 DataStream API就可以。这里只是为了方便大家理解,我们依然用 DataSetAPI做了批处理的实现。
2、DataStreaming(流处理)
package cn.konne.wc;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;/** * DataStreaming */public class WordCountStream { public static void main(String[] args) throws Exception { // TODO 1、创建环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // TODO 2、读取数据 DataStreamSource lineDs = env.readTextFile(\"D:\\\\JAVA\\\\konne\\\\words.txt\"); // TODO 3、处理数据 SingleOutputStreamOperator<Tuple2> wordAndOne = lineDs.flatMap(new FlatMapFunction<String, Tuple2>() { @Override public void flatMap(String value, Collector<Tuple2> out) throws Exception { String[] s = value.split(\" \"); for (String string : s) { out.collect(new Tuple2(string, 1)); } } }); // TODO 4、分组 KeyedStream<Tuple2, String> wordAndOneKS = wordAndOne.keyBy(new KeySelector<Tuple2, String>() { @Override public String getKey(Tuple2 value) throws Exception { return value.f0; } }); // TODO 5、聚合 SingleOutputStreamOperator<Tuple2> sum = wordAndOneKS.sum(1); // TODO 6、输出数据 sum.print(); // TODO 7、执行 env.execute(); }}
输出:
7> (worlde,1)7> (flink,1)3> (hello,1)3> (hello,2)3> (hello,3)8> (iava,1)
总结:
对于批处理和流处理输出的结果,可以发现批处理的hello是一次性计算出的结果,这就是批处理的特点。而流处理的hello输出了三次,体现了流处理数据一条一条计算的结果。
四、WordCount代码编写(无界流)
在实际的生产环境中,真正的数据流其实是无界的,有开始却没有结束,这就要求我们需要持续地处理捕获的数据。为了模拟这种场景,可以监听 socket 端口,然后向该端口不断的发送数据。
将 StreamWordCount 代码中读取文件数据的 readTextFile 方法,替换成读取 socket文本流的方法socketTextStream。具体代码实现如下:
1、代码编写
package cn.konne.un;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;public class WordCountUn { public static void main(String[] args) throws Exception { // TODO 1、创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // TODO 2、读取数据 从socket读取数据 DataStreamSource socketDs = env.socketTextStream(\"hadoop102\", 9999); // TODO 3、处理数据 SingleOutputStreamOperator<Tuple2> tuple2SingleOutputStreamOperator = socketDs .flatMap( (String value, Collector<Tuple2> out) -> { String[] words = value.split(\" \"); for (String word : words) { out.collect(new Tuple2(word, 1)); } } ).returns(Types.TUPLE(Types.STRING, Types.INT)); // TODO 4、聚合 KeyedStream<Tuple2, String> tuple2StringKeyedStream = tuple2SingleOutputStreamOperator.keyBy((Tuple2 value) -> { return value.f0; }); // TODO 5、输出数据 SingleOutputStreamOperator<Tuple2> sum = tuple2StringKeyedStream.sum(1); sum.print(); // TODO 6、执行 env.execute(); }}