> 技术文档 > 模拟flink处理无限数据流

模拟flink处理无限数据流

如果没有在 linux 环境下安装 flink ,先看我的上一篇文章:如何搭建Linux环境下的flink本地集群-CSDN博客

使用工具:IntelliJ IDEA 2021,Maven 3.6.1

第一步,创建一个空的 Maven 项目,导入依赖

 4.0.0 org.example flinkLearn 1.0-SNAPSHOT   org.apache.flink flink-runtime-web ${flink.version}   ch.qos.logback logback-classic 1.2.11   org.apache.flink flink-streaming-java ${flink.version}   org.apache.flink flink-clients ${flink.version}    8 8 1.17.0     org.apache.maven.plugins maven-shade-plugin 3.2.4    package  shade      com.google.code.findbugs:jsr305  org.slf4j:*  log4j:*        *:*    META-INF/*.SF  META-INF/*.DSA  META-INF/*.RSA               

等等依赖下载完毕

第二步,编写分词处理无界流代码

此刻先不要运行,因为还没有数据源

import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;public class StreamNoBundedWordCount { public static void main(String[] args) throws Exception{ //拿到执行环境 StreamExecutionEnvironment evn = StreamExecutionEnvironment.getExecutionEnvironment(); //数据源 source //注:加 returns 是因为 lambda 表达式无法识别二元组中的类型,故手动指定以避免报错 DataStreamSource source = evn.socketTextStream(\"localhost\", 8888); SingleOutputStreamOperator<Tuple2> sum = source.flatMap((String value, Collector<Tuple2> out) -> { String[] words = value.split(\" \"); for (String word : words) { out.collect(Tuple2.of(word, 1)); } }) .returns(Types.TUPLE(Types.STRING,Types.INT)) .keyBy((Tuple2 value) -> value.f0) .sum(1); //写入,sink sum.print(); //执行 evn.execute(); }}

 

第三步,进入 linux 环境,安装 netcat 模拟数据源发送数据

安装 netcat

sudo apt updatesudo apt install netcat

验证安装情况

nc --version

模拟数据源,监听 8888 端口

nc -lk 8888

第四步,启动程序

点击 绿色按钮 启动 Main 程序后

在 netcat 中发送数据比如 hello world

可以看到程序可以正常运行

第五步,打包并上传到 linux 中的 flink 集群中

 点开右侧的 Maven 选项,找到生命周期,先 clean 一下,将多余文件清除

再 packge

成功后会生成一个 target 文件夹,找到其中的 jar 包,会生成两个

两个都能用,上面那个东西要少一些 

然后我们将 jar 包上传到 flink 的 web UI 上

点击 add new

找到我们刚刚生成的 jar 包 并点击 \"打开\" 上传

点击 jar 包,为其指定启动类,指定并行度,这里我写的 2

 可以看到 作业启动成功

接下来,我们再 netcat 中发送数据 hello

可以看到, 集群中的 task Managers 成功接收到了数据并进行词频统计