Flink框架中的时间语义和Watermark(数据标记)
大家好,我是百思不得小赵。
创作时间:2022 年 5 月 25 日
博客主页: 🔍点此进入博客主页
—— 新时代的农民工 🙊
—— 换一种思维逻辑去看待这个世界 👀
今天是加入CSDN的第1179天。觉得有帮助麻烦👏点赞、🍀评论、❤️收藏
目录
-
- 时间语义
- Watermark(水位线)
- 总结
时间语义
“时间”在我们日常的开发学习过程中是特别常见的一个名词,例如:Java中的日期处理类、获取系统的当前时间、毫秒级的时间戳等等。接下来让我们来看看在Flink框架中,对时间不同的概念。Flink框架中有三个时间的语义:事件时间(Event Time )、摄入时间(Ingestion Time)、系统处理时间(Processing Time)。
- Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访问事件时间戳。
- Ingestion Time:数据进入 Flink 的时间。
- Processing Time:每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time。
在Flink流处理真实场景中,大部分的业务需求都会使用事件时间语义,但还是以具体的业务需求择选不同的时间语义。Flink引入EventTime方式如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 设置全局事件时间语义 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
当 Flink 以 Event Time 模式处理数据流时,它会根据数据里的时间戳来处理基于时间的算子.
Watermark(水位线)
在Flink数据处理过程中,数据从产生到计算到输出结果,是需要一个过程时间,在正常的情况下数据往往都是按照事件产生的时间顺序进行的,由于网络、分布式部署等原因会导致数据产生乱序问题,相当于Flink接收到的数据的先后顺序不是按照时间的事件时间顺序排列进行的。乱序数据会让窗口计算不准确.。如何避免这个问题呢?
如果一个数据的时间戳达到了窗口关闭时间,不应该立刻触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口
Watermark概念
- Watermark 是一种衡量 Event Time 进展的机制,可以设定延迟触发
- Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark 机制结合 window 来实现。
- 数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据,都已经到达了,因此,window 的执行也是由 Watermark 触发的。
Watermark 可以理解成一个延迟触发机制,我们可以设置 Watermark 的延时时长 t,每次系统会校验已经到达的数据中最大的maxEventTime,然后认定 eventTime小于 maxEventTime - t 的所有数据都已经到达,如果有窗口的停止时间等于maxEventTime – t,那么这个窗口被触发执行。
当 Flink 接收到数据时,会按照一定的规则去生成 Watermark,这条 Watermark就等于当前所有到达数据中的 maxEventTime - 延迟时长,也就是说,Watermark 是基于数据携带的时间戳生成的,一旦 Watermark 比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口的执行。由于 event time 是由数据携带的,因此,如果运行过程中无法获取新的数据,那么没有被触发的窗口将永远都不被触发。
Watermark 就是触发前一窗口的“关窗时间”,一旦触发关门那么以当前时刻为准在窗口范围内的所有所有数据都会收入窗中。只要没有达到水位那么不管现实中的时间推进了多久都不会触发关窗。
举个栗子:如下图为一个乱序的 数据,将Watermark设置为2
如上图,将最大的延时时间设置为2秒,所以时间戳为 7s 的事件对应的 Watermark 是 5s,时间戳为 12s 的事件的 Watermark 是 10s,如果我们的窗口 1是 1s~5s,窗口 2 是 6s~10s,那么时间戳为 7s 的事件到达时的 Watermarker 恰好触发窗口 1,时间戳为 12s 的事件到达时的 Watermark 恰好触发窗口 2。
Watermark的特点
- 相当于一条特殊的数据记录
- 必须是单调递增的,一旦确定无法回滚,以确保任务事件时间在向前推进
- 与每条数据的时间戳强相关
Watermark的使用
对于排序好的数据,不需要延迟触发,只需要指定时间戳即可。
代码如下:
public class WindowTest3_EventTimeWindow { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 设置全局事件时间语义 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setAutoWatermarkInterval(100); DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777); DataStream<SensorReading> dataStream = inputStream.map(line -> { String[] field = line.split(","); return new SensorReading(field[0], new Long(field[1]), new Double(field[2])); }) // 升序数据设置事件时间和watermark // 无需设置延时时间 把当前的getTimestamp提取出当成当前的事件时间 .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SensorReading>() { @Override public long extractAscendingTimestamp(SensorReading element) { return element.getTimestamp() * 1000; } }); // 基于事件时间的开窗聚合 统计15秒内温度的最小值 SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id") .timeWindow(Time.seconds(15)) .minBy("temperature"); minTempStream.print("minTemp"); env.execute(); }}
处理乱序数据需要制定对应的延时时间。
代码如下:
public class WindowTest3_EventTimeWindow { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 设置全局事件时间语义 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setAutoWatermarkInterval(100); DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777); DataStream<SensorReading> dataStream = inputStream.map(line -> { String[] field = line.split(","); return new SensorReading(field[0], new Long(field[1]), new Double(field[2])); }) // 乱序数据设置事件时间和watermark // 需要设置延时时间 .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) { @Override public long extractTimestamp(SensorReading element) { return element.getTimestamp() * 1000; } }); // 基于事件时间的开窗聚合 统计15秒内温度的最小值 SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id") .timeWindow(Time.seconds(15)) .minBy("temperature"); minTempStream.print("minTemp"); env.execute(); }}
TimestampAssigner
Flink日供了TimestampAssigner接口,我们可以自定义去实现从数据中抽取时间戳的规则以及生成Watermark的规则。有如下两种类型:
- AssignerWithPeriodicWatermarks
周期性的生成 watermark:系统会周期性的将 watermark 插入到流中(水位线也是一种特殊的事件!)。默认周期是 200 毫秒。如下方式进行修改
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setAutoWatermarkInterval(100);
- AssignerWithPunctuatedWatermarks
没有时间周期,间断式地生成 watermark。和周期性生成的方式不同,这种方式不是固定时间的,而是可以根据需要对每条数据进行筛选和处理
总结
- 在flink开发过程中,Watermark的使用由开发人员生成。
- 若watermark设置的延迟太久,收到结果的速度可能就会很慢,解决办法是在水位线到达之前输出一个近似结果。
- 若watermark到达得太早,则可能收到错误结果,但Flink对延时数据的处理机制可以友好解决。
- Flink如何解决数据的乱序问题,提供了三种处理机制:使用Watermark、设置窗口延时 (allowedLateness)、设置侧流(sideOutputLateData0
public class WindowTest3_EventTimeWindow { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 设置全局事件时间语义 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setAutoWatermarkInterval(100); DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777); DataStream<SensorReading> dataStream = inputStream.map(line -> { String[] field = line.split(","); return new SensorReading(field[0], new Long(field[1]), new Double(field[2])); }) // 乱序数据设置事件时间和watermark // 需要设置延时时间 .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) { @Override public long extractTimestamp(SensorReading element) { return element.getTimestamp() * 1000; } }); OutputTag<SensorReading> tag = new OutputTag<>("late"); // 基于事件时间的开窗聚合 统计15秒内温度的最小值 SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id") .timeWindow(Time.seconds(15)) // 设置窗口延时 .allowedLateness(Time.seconds(1)) // 将窗口关闭后的数据输出到侧流 .sideOutputLateData(tag) .minBy("temperature"); minTempStream.print("minTemp"); minTempStream.getSideOutput(tag).print("late"); env.execute(); }}