> 文档中心 > Flink使用指南:状态计算完全搞懂了,你就是大佬!

Flink使用指南:状态计算完全搞懂了,你就是大佬!

嗯,今天我们来聊聊Flink的状态与容错机制,这可是搞实时计算的必修课哦!为什么呢?因为在处理数据流的时候,中间结果得存在某个地方,也就是所谓的“状态”。但是,程序可能会炸掉,机器可能会出故障,这时候如果数据丢了、重启之后还能接着从断掉的地方继续算,这就是“容错”的魅力所在。

文章里提到了Checkpoint机制、Keyed State和Operator State这些核心概念。Checkpoint简单概括就是定期拍一张照片,记录当时的状态,这样断了之后可以恢复。开启Checkpoint看起来很简单,但参数设置可不是闹着玩的,比如间隔时间、模式、超时时间等等,得根据实际情况调调试试。

再说说Keyed State和Operator State的区别吧。Keyed State是和Key相关的,比如每个人的历史记录,只能在KeyBy之后的操作里用。而Operator State就是每个并行任务自己的小本本,像Kafka消费者每个分区的offset就是 Operator State的典型应用。

Managed State和Raw State的选择也很重要。Managed State是Flink托管的,支持更多功能,但需要自己实现序列化器。Raw State虽然灵活,但维护起来麻烦,因此一般情况下还是用Managed State靠谱哦。

总之,状态管理是保证数据处理可靠性的关键,理解并正确使用这些机制,才能让实时计算系统稳稳地跑,保障数据不丢不重。下次再聊聊具体的Checkpoint算法和实战案例,感兴趣的朋友记得继续关注哦。

系列文章目录

Flink使用指南: 面试必问内存管理模型,进大厂一定要知道!

Flink使用指南: Kafka流表关联HBase维度表

Flink使用指南: Watermark新版本使用

Flink使用指南: Flink SQL自定义函数

目录

系列文章目录

前言

一、Checkpoint机制

如何开启Checkpoint

二、Keyed State 和 Operator State

原始状态和托管状态

如何使用Managed Keyed State

状态的生命周期(TTL)

如何使用Managed Operator  State

三. checkpoint算法

总结

前言

什么是状态与容错?

在使用Flink做实时计算时,计算中间结果如何存储,这叫状态;当想升级某个程序代码或者某个程序异常退出等事故情况,Flink如果能保证数据准备性,这叫容错。

Flink的状态与容错主要分为一下几个知识点:

  • Checkpoint机制
  • Savepoint机制
  • State Backends机制

一、Checkpoint机制

  • 如何开启Checkpoint

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 每 1000ms 开始一次 checkpointenv.enableCheckpointing(1000);// 高级选项:// 设置模式为精确一次 (这是默认值)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 确认 checkpoints 之间的时间会进行 500 msenv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);// Checkpoint 必须在一分钟内完成,否则就会被抛弃env.getCheckpointConfig().setCheckpointTimeout(60000);// 同一时间只允许一个 checkpoint 进行env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 开启在 job 中止后仍然保留的 externalized checkpointsenv.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

更多参数配置请参考Flink配置文档:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/config.html

Flink的Checkpoint机制会将operator操作进行快照存储到State Backends中,默认情况,状态是保持在 TaskManagers 的内存中,checkpoint 保存在 JobManager 的内存中。为了合适地持久化大体量状态, Flink 支持各种各样的途径去存储 checkpoint 状态到其他的 state backends 上。Flink 现在为没有迭代(iterations)的作业提供一致性的处理保证。在迭代作业上开启 checkpoint 会导致异常。为了在迭代程序中强制进行 checkpoint,用户需要在开启 checkpoint 时设置一个特殊的标志:env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE, force = true)。

二、Keyed State 和 Operator State

首先,flink中的state分为两种:Keyed State 和 Operator State。

Keyed State:keyed state始终与key相关,所以只能在KeyedStream的函数和算子中使用keyed state。可以理解为,KeyedStream的算子或者函数按照key将数据流进行分区,每个key就是一个分区,而每个分区都保存着一个的keyed state。

以后版本中,可能会将keyed state改为Key Groups,Key Groups就是一个flink实例被分配到的所有key的组合。所以Key Groups的数量等于设置的并行度。

Operator State:Operator State即non-keyed state。算子操作或者非键控函数的每个并行任务都会绑定一个Operator State。如kafka连接器就是一个很好的例子:kafka消费者的每个分区都会维护一个map类型的数据,作为状态保存topic、分区和offset。当并行度发生变化时,Operator State支持重新分配状态。

原始状态和托管状态

keyed state和Operator State可以有两种形式存在:managed (托管)raw(原始)。

Managed State:Managed State在运行时由flink控制,保存在哈希表、RocksDB等结构化数据中。如ValueState、ListState。flink会对Managed State编码,并写入checkpoint。

Raw State:Raw State是以自定义的数据类型保存的状态信息。在写入checkpoint时,作为二进制序列写入checkpoint中。所以flink不知道Operator State的数据结构,只能获得原始的二进制序列。通常情况下,使用managed state居多。

所有的flink函数都可以使用Managed State,但是如果需要使用Raw State,则需要在函数内实现相应的接口。相较于Raw State,官方更推荐使用Managed State,使用Managed State时,支持修改并行度后自动重新分配状态,且具备更完善的内存管理。

注意:如果使用Managed State时需要自定义序列化器

https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/stream/state/custom_serialization.html

如何使用Managed Keyed State

上面说到,flink的状态可以保存所有元素、聚合结果、历史数据等,flink提供了相应的接口以实现这些功能。另外,keyed state顾名思义,必须在stream.keyBy(…)之后使用,否则会报错。

下面是flink提供的状态数据类型:

ValueState: 仅保存一个可更新、可检索的值。作用域为输入元素的键,即每个key保存一个类型的状态。可以使用update(T)方法更新状态,或使用value()方法获取状态。

ListState: 保存一个列表的状态。可以对这个列表进行追加写或者检索。使用add(T)或者addAll(List)方法添加数据。使用get()方法可获得一个可迭代对象,可以在这个对象中检索数据。也可以使用update(List)覆盖所有的数据。

ReducingState: 保存一个唯一值,这个值是当前所有元素的预聚合结果。这个接口与ListState 相似,区别在于ReducingState的add()方法是调用ReduceFunction方法,将当前元素与之前的预聚合结果进行计算,再保存新的预聚合结果。

AggregatingState:保存一个唯一值,这个值是当前所有元素的预聚合结果。与ReducingState的区别在于AggregatingState的输入类型和输出类型可以不一致,AggregatingState分别定义输入、输出两个参数的数据类型。add(IN)内部调用的是AggregateFunction方法。

MapState: 保存一个列表的map类型的状态。可以使用put方法向其添加k-v类型的键值对,也可以用于检索。使用 put(UK, UV) 或者putAll(Map) 方法添加数据;使用entries()keys() 和values() 来检索key和value。使用 isEmpty()判断是否存在数据。

所有的状态类型都有一个clear()方法,用于清空当前key的状态中所有的数据。

重点提示一:以上的状态类型对象仅仅只是作为一个状态的接口而已,状态不一定是存储在以上对象里面的,还可以存在本地磁盘或者其他地方。

重点提示二:你从状态中获取的value取决于当前输入元素的key,所以,你调用的同一个函数会根据不同的key返回不同的value。

获取状态时,必须创建一个StateDescriptor对象用于描绘状态的名称和数据类型,还可能包含自定义的函数,如ReduceFunction。状态通过RuntimeContext调用getState方法来获取,所以必须是富函数才能获取状态。

获取不同的状态对应方式如下:

  • ValueState getState(ValueStateDescriptor)
  • ReducingState getReducingState(ReducingStateDescriptor)
  • ListState getListState(ListStateDescriptor)
  • AggregatingState getAggregatingState(AggregatingStateDescriptor)
  • FoldingState getFoldingState(FoldingStateDe