【Flink1.14实战】Docker环境 DataStream API算子汇总
目录
-
- Flink DataStream API
-
- 概述
- Map
- FlatMap
- Filter
- KeyBy
- Reduce
- Window
- WindowAll
- WindowReduce
- IterativeStream
- ProcessFunction
Flink DataStream API
概述
用户通过算子能将一个或多个 DataStream 转换成新的 DataStream,在应用程序中可以将多个数据转换算子合并成一个复杂的数据流拓扑。
这部分内容将描述 Flink DataStream API 中基本的数据转换API,数据转换后各种数据分区方式,以及算子的链接策略。
Map
DataStream → DataStream
接受一个元素并产生一个元素。将输入流的值加倍的映射函数:
package quick;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class MapExample { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream dataStream = env .fromElements(1, 2, 3, 4, 5) .map(new MapFunction() { @Override public Integer map(Integer value) throws Exception { return 2 * value; } }); dataStream.print(); env.execute("MapExample"); }}
FlatMap
DataStream → DataStream
接受一个元素并产生零个、一个或多个元素。将句子拆分为单词的 flatmap 函数:
package quick;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;public class FlatMapExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream dataStream = env .fromElements("Flink Spark Storm","Flink Flink Flink") .flatMap(new FlatMapFunction() { @Override public void flatMap(String value, Collector out)throws Exception { for(String word: value.split(" ")){out.collect(word); } } }); dataStream.print(); env.execute("FlatMapExample job"); }}
Filter
DataStream → DataStream
过滤流。过滤掉零值的过滤器
package quick;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FilterExample { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream dataStream = env .fromElements(0,1, 2, 3, 4, 5) .filter(new FilterFunction() { @Override public boolean filter(Integer value) throws Exception { return value != 0; } }); dataStream.print(); env.execute("FilterExample job"); }}
KeyBy
**DataStream → KeyedStream **
逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的。
根据Key,累计统计
package quick;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class KeyByExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2> dataStream = env .fromElements(Tuple2.of(1,1),Tuple2.of(1,2),Tuple2.of(2,2),Tuple2.of(2,2)) .keyBy(value -> value.f0) .sum(1); dataStream.print(); env.execute("KeyByExample job"); }}
输出
(1,1)(1,3)(2,2)(2,4)
Reduce
**KeyedStream→ DataStream **
一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
如下例子和sum的效果一样
package quick;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class ReduceExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2> dataStream = env .fromElements(Tuple2.of(1,1),Tuple2.of(1,2),Tuple2.of(2,2),Tuple2.of(2,2)) .keyBy(value -> value.f0) .reduce(new ReduceFunction<Tuple2>() { @Override public Tuple2 reduce(Tuple2 value1, Tuple2 value2)throws Exception { return new Tuple2(value1.f0 , value1.f1+value2.f1); } }); dataStream.print(); env.execute("ReduceExample job"); }}
输出:
(1,1)(1,3)(2,2)(2,4)
Window
KeyedStream → WindowedStream
可以在已分区的 KeyedStreams 上定义 Windows。Windows 根据某些特征(例如,最近 5 秒内到达的数据)对每个键中的数据进行分组。
import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector;public class WindowExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2> dataStream = env .socketTextStream("localhost", 9999) .flatMap(new Splitter()) .keyBy(value -> value.f0) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .sum(1); dataStream.print(); env.execute("WindowExample"); } public static class Splitter implements FlatMapFunction<String, Tuple2> { @Override public void flatMap(String sentence, Collector<Tuple2> out) throws Exception { for (String word: sentence.split(" ")) { out.collect(new Tuple2(word, 1)); } } }}
要运行示例程序,首先从终端使用 netcat 启动输入流:
nc -lk 9999
然后输入word回车
WindowAll
DataStream → AllWindowedStream
可以在常规 DataStreams 上定义 Windows。Windows 根据某些特征(例如,最近 5 秒内到达的数据)对所有流事件进行分组。
package quick;import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;import quick.source.DataSource;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import org.apache.flink.streaming.api.windowing.time.Time;import java.util.List;public class WindowAllExample { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple3> inStream = env.addSource(new MyRichSourceFunction()); DataStream<Tuple3> dataStream = inStream .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5))) //注意:计算变量为f2 .maxBy(2); dataStream.print(); env.execute("WindowAllExample job"); } /** * 模拟数据持续输出 */ public static class MyRichSourceFunction extends RichSourceFunction<Tuple3> { @Override public void run(SourceContext<Tuple3> ctx) throws Exception { List<Tuple3> tuple3List = DataSource.getTuple3ToList(); for (Tuple3 tuple3 : tuple3List){ ctx.collect(tuple3); System.out.println("----"+tuple3); //1秒钟输出一个 Thread.sleep(1 * 1000); } } @Override public void cancel() { try{ super.close(); }catch (Exception e){ e.printStackTrace(); } } }}
定义数据
package quick.source;import org.apache.flink.api.java.tuple.Tuple3;import java.util.Arrays;import java.util.List;/** * @Description 公共示例数据 */public class DataSource { /** * 示例数据集合 * Tuple3 是一个固定3个属性变量的实体类,分别用f0,f1,f2表示三个构造传参与变量 * @return */ public static List<Tuple3> getTuple3ToList(){ //Tuple3 = Tuple3 return Arrays.asList( new Tuple3("张三", "man", 20), new Tuple3("李四", "girl", 24), new Tuple3("王五", "man", 29), new Tuple3("刘六", "girl", 32), new Tuple3("伍七", "girl", 18), new Tuple3("吴八", "man", 30) ); }}
输出:
----(张三,man,20)----(李四,girl,24)----(王五,man,29)----(刘六,girl,32)----(伍七,girl,18)(刘六,girl,32)----(吴八,man,30)
说明:
max/min 操作 会根据用户指定的字段取最小值(而字段外的其他值 并不能保证正确) 而maxBy/minBy 指的是自己本身的这条数据。
WindowReduce
WindowedStream → DataStream
将ReduceFunction应用于窗口并返回归约值,和sum效果一样。
package quick;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector;public class WindowReduce { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2> dataStream = env .socketTextStream("localhost", 9999) .flatMap(new WindowWordCount.Splitter()) .keyBy(value -> value.f0) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .reduce (new ReduceFunction<Tuple2>() { public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception { return new Tuple2(value1.f0, value1.f1 + value2.f1); } }); dataStream.print(); env.execute("WindowReduce job"); } public static class Splitter implements FlatMapFunction<String, Tuple2> { @Override public void flatMap(String sentence, Collector<Tuple2> out) throws Exception { for (String word: sentence.split(" ")) { out.collect(new Tuple2(word, 1)); } } }}
测试
$ yum install nc -y$ nc -lk 9999hello world
IterativeStream
DataStream → IterativeStream → ConnectedStream
通过将一个运算符的输出重定向到某个先前的运算符,在流程中创建一个“反馈”循环。这对于定义不断更新模型的算法特别有用。下面的代码从一个流开始,不断地应用迭代体。大于 0 的元素被送回反馈通道,其余元素向下游转发。
package quick;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.IterativeStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;public class IterativeStreamExample { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream someIntegers =env.generateSequence(0, 100); // 创建迭代流 IterativeStream iteration =someIntegers.iterate(); // 增加处理逻辑,对元素执行减一操作。 DataStream minusOne =iteration.map(new MapFunction() { @Override public Long map(Long value) throws Exception { return value - 1 ; } }); // 获取要进行迭代的流, DataStream stillGreaterThanZero= minusOne.filter(new FilterFunction() { @Override public boolean filter(Long value) throws Exception { return (value > 0); } }); // 对需要迭代的流形成一个闭环 iteration.closeWith(stillGreaterThanZero); // 小于等于0的数据继续向前传输 DataStream lessThanZero =minusOne.filter(new FilterFunction() { @Override public boolean filter(Long value) throws Exception { return (value <= 0); } }); minusOne.print(); env.execute("IterativeStream job"); }}
ProcessFunction
【示例】维护数据流中每个key的计数,并在每过一分钟(以事件时间)而未更新该key时,发出一个key/count对。
1)首先导入必须所依赖包
package com.xueai8;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.state.ValueState;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.java.tuple.Tuple;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.KeyedProcessFunction;import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;import org.apache.flink.util.Collector;
2)定义存储状态数据的数据结构(数据模型)
/** * 存储在状态中的数据类型 */public class CountWithTimestamp {public String key; // 存储keypublic long count; // 存储计数值public long lastModified; // 最后一次修改时间}
3)自定义ProcessFunction,继承自KeyedProcessFunction:
public class CountWithTimeoutFunctionextends KeyedProcessFunction<Tuple, Tuple2<String, String>, Tuple2<String, Long>> {/** 由这个处理函数负责维护的状态 */private ValueState<CountWithTimestamp> state;// 首先获得由这个处理函数(process function)维护的状态 // 通过 RuntimeContext 访问Flink的keyed state@Overridepublic void open(Configuration parameters) throws Exception {state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));}// 对于在输入流中接收到的每一个事件,此函数就会被调用以处理该事件// 对于每个记录,KeyedProcessFunction递增计数器并设置最后修改时间戳@Overridepublic void processElement(Tuple2<String, String> value,Context ctx,Collector<Tuple2<String, Long>> out) throws Exception {// 获取当前的计数CountWithTimestamp current = state.value();if (current == null) {current = new CountWithTimestamp();current.key = value.f0;}// 更新状态计数值current.count++;// 设置该状态的时间戳为记录的分配的事件时间时间时间戳 current.lastModified = ctx.timestamp(); // 将状态写回state.update(current);// 从当前事件时间开始安排下一个计时器60秒ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);}// 如果一分钟内没有进一步的更新,则发出 key/count对@Overridepublic void onTimer(long timestamp,OnTimerContext ctx,Collector<Tuple2<String, Long>> out) throws Exception {// 获取调度此计时器的key的状态CountWithTimestamp result = state.value();// 检查这是一个过时的计时器还是最新的计时器if (timestamp == result.lastModified + 60000) {// 超时时发出状态out.collect(new Tuple2<String, Long>(result.key, result.count));}}}
4)在流处理的主方法中应用自定义的处理函数
public class StreamingJob { public static void main(String[] args) throws Exception {// 设置流执行环境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 默认情况下,Flink将使用处理时间。要改变这个,可以设置时间特征: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 源数据流 DataStream<Tuple2<String, String>> stream = env .fromElements("good good study","day day up","you see see you") .flatMap(new FlatMapFunction<String, Tuple2<String,String>>() { @Override public void flatMap(String line, Collector<Tuple2<String, String>> collector) throws Exception { for(String word : line.split("\\W+")){collector.collect(new Tuple2<>(word,"1")); } } });// 因为模拟数据没有时间戳,所以用此方法添加时间戳和水印 DataStream<Tuple2<String, String>> withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple2<String, String>>() { @Override public long extractAscendingTimestamp(Tuple2<String, String> element) { return System.currentTimeMillis(); } });// 在keyed stream上应用该处理函数DataStream<Tuple2<String, Long>> result = withTimestampsAndWatermarks.keyBy(0).process(new CountWithTimeoutFunction());// 输出查看 result.print();// 执行流程序env.execute("Process Function"); }}
超强干货来袭 云风专访:近40年码龄,通宵达旦的技术人生股票学习网站