模拟flink处理无限数据流
如果没有在 linux 环境下安装 flink ,先看我的上一篇文章:如何搭建Linux环境下的flink本地集群-CSDN博客
使用工具:IntelliJ IDEA 2021,Maven 3.6.1
第一步,创建一个空的 Maven 项目,导入依赖
4.0.0 org.example flinkLearn 1.0-SNAPSHOT org.apache.flink flink-runtime-web ${flink.version} ch.qos.logback logback-classic 1.2.11 org.apache.flink flink-streaming-java ${flink.version} org.apache.flink flink-clients ${flink.version} 8 8 1.17.0 org.apache.maven.plugins maven-shade-plugin 3.2.4 package shade com.google.code.findbugs:jsr305 org.slf4j:* log4j:* *:* META-INF/*.SF META-INF/*.DSA META-INF/*.RSA
等等依赖下载完毕
第二步,编写分词处理无界流代码
此刻先不要运行,因为还没有数据源
import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;public class StreamNoBundedWordCount { public static void main(String[] args) throws Exception{ //拿到执行环境 StreamExecutionEnvironment evn = StreamExecutionEnvironment.getExecutionEnvironment(); //数据源 source //注:加 returns 是因为 lambda 表达式无法识别二元组中的类型,故手动指定以避免报错 DataStreamSource source = evn.socketTextStream(\"localhost\", 8888); SingleOutputStreamOperator<Tuple2> sum = source.flatMap((String value, Collector<Tuple2> out) -> { String[] words = value.split(\" \"); for (String word : words) { out.collect(Tuple2.of(word, 1)); } }) .returns(Types.TUPLE(Types.STRING,Types.INT)) .keyBy((Tuple2 value) -> value.f0) .sum(1); //写入,sink sum.print(); //执行 evn.execute(); }}
第三步,进入 linux 环境,安装 netcat 模拟数据源发送数据
安装 netcat
sudo apt updatesudo apt install netcat
验证安装情况
nc --version
模拟数据源,监听 8888 端口
nc -lk 8888
第四步,启动程序
点击 绿色按钮 启动 Main 程序后
在 netcat 中发送数据比如 hello world

可以看到程序可以正常运行
第五步,打包并上传到 linux 中的 flink 集群中
点开右侧的 Maven 选项,找到生命周期,先 clean 一下,将多余文件清除
再 packge
成功后会生成一个 target 文件夹,找到其中的 jar 包,会生成两个

两个都能用,上面那个东西要少一些
然后我们将 jar 包上传到 flink 的 web UI 上

点击 add new
找到我们刚刚生成的 jar 包 并点击 \"打开\" 上传

点击 jar 包,为其指定启动类,指定并行度,这里我写的 2
可以看到 作业启动成功

接下来,我们再 netcat 中发送数据 hello
可以看到, 集群中的 task Managers 成功接收到了数据并进行词频统计



