> 技术文档 > Flink-Checkpoint机制详解:(第41天)_flink checkpoint机制

Flink-Checkpoint机制详解:(第41天)_flink checkpoint机制


系列文章目录

1、为什么要学Checkpoint机制
2、Flink怎么实现容错
3、Checkpoint机制的执行流程
4、重启策略Restart Strategy
5、状态后端State Backend
6、开源Flink案例

文章目录

  • 系列文章目录
  • 前言
      • 1、为什么要学Checkpoint机制
      • 2、Flink怎么实现容错
      • 3、Checkpoint机制的执行流程
      • 4、重启策略Restart Strategy
        • 4.1 不重启策略
        • 4.2 固定延迟重启策略
        • 4.3 失败率重启策略
        • 4.4 指数延迟重启策略
        • 4.5 小结
      • 5、状态后端State Backend
        • 5.1 内存状态后端MemoryStateBackend
        • 5.2 文件系统状态后端FsStateBackend
        • 5.3 RocksDB数据库状态后端RocksDBStateBackend
      • 6、开源Flink案例

前言

本文通过案例方式详解-Flink-Checkpoint机制

1、为什么要学Checkpoint机制

因为Flink是流式(实时)计算程序,我们工作中希望Flink程序能够7x24小时运行,同时遇到一些问题/bug以后,能够自动恢复程序的运行。

2、Flink怎么实现容错

Flink由于是实时运行的程序,因此不仅要对中间计算的数据进行容错,还需要对程序进行容错。也就是Flink中的容错分为如下两类:

  • 状态后端:对中间计算的数据进行容错
  • 重启策略:对程序进行容错,让程序能够自动恢复

3、Checkpoint机制的执行流程

Flink-Checkpoint机制详解:(第41天)_flink checkpoint机制

步骤如下:

Flink中Checkpoint执行流程:1- JobManager中的检查点协调器会将barrier栅栏发送给到source算子2- source算子接收到栅栏以后,先暂停对数据的处理工作,将算子运行的状态数据先保存到TaskManager上形成State状态数据;同时会向检查点协调器上报数据,在检查点协调器中获得到的数据称之为Checkpoint数据。数据上报完以后,才会恢复对数据的处理。3- 栅栏会随着数据从source算子一直流动到最后的sink算子4- 每个算子拿到栅栏以后的处理过程与source算子一样。也就是先暂停对数据的处理,在TaskManager上保存State状态数据,以及向检查点协调器汇报Checkpoint数据。然后才会继续处理数据5- 直到所有的算子将数据汇报完成,那么这个过程才算结束。

4、重启策略Restart Strategy

重启策略,能够让Flink程序在挂了之后进行自动重启。保证任务容错。既可以在代码中设置,也能够在配置文件中设置,一般推荐使用代码进行设置。

官网链接如下:

https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/state/task_failure_recovery/

Flink有如下的几种重启策略。

  • 不重启策略:一般不用
  • 固定延迟重启策略:频繁使用
  • 失败率重启策略:频繁使用
  • 指数延迟重启策略:较少使用

官网文档如下:

Flink-Checkpoint机制详解:(第41天)_flink checkpoint机制

4.1 不重启策略

Flink程序不重启,如果遇到异常就挂了。

代码中配置:

env.set_restart_strategy(RestartStrategies.no_restart())

配置文件flink-conf.yaml中的配置:

restart-strategy: none
4.2 固定延迟重启策略

允许Flink程序固定可以重启几次。每次重启的时间间隔是多少。这些参数是自己指定的。

代码中配置:

env = StreamExecutionEnvironment.get_execution_environment()env.set_restart_strategy(RestartStrategies.fixed_delay_restart( 3, #重启的次数 10000 #延迟时间,这里配置的是10000毫秒))如果重启的次数超过了3次,那么不会再给你重启

配置文件flink-conf.yaml中的配置:

restart-strategy: fixed-delay #配置固定延迟重启restart-strategy.fixed-delay.attempts: 3 #重启的次数restart-strategy.fixed-delay.delay: 10 s #重启的间隔时间
4.3 失败率重启策略

在一定的时间范围内,重启的次数在允许范围内,那么会一直给你重启。

代码中配置:

env.set_restart_strategy(RestartStrategies.failure_rate_restart( 3, #间隔时间内重启的次数 300000, #时间间隔 10000 #延迟时间,这里配置的是10000毫秒))如果在300000毫秒统计时间以内,重启次数小于等于3次,那么会持续的给你进行重启;如果超过,不会再重启。

配置文件flink-conf.yaml中配置:

restart-strategy: failure-rate #配置失败率重启restart-strategy.failure-rate.max-failures-per-interval: 3 #最大重启的次数restart-strategy.failure-rate.failure-rate-interval: 5 min #失败率的时间间隔restart-strategy.failure-rate.delay: 10 s #每次重启的时间间隔
4.4 指数延迟重启策略

Flink程序的重启时间随着指数的增加而呈指数级别递增。注意:阿里云Flink中没有这种重启策略

Flink-Checkpoint机制详解:(第41天)_flink checkpoint机制

代码中配置:

Python暂不支持。

配置文件flink-conf.yaml中配置:

restart-strategy: exponential-delay#配置指数延迟重启restart-strategy.exponential-delay.initial-backoff: 10 s #重启的初始值restart-strategy.exponential-delay.max-backoff: 2 min#最大从重启时间间隔restart-strategy.exponential-delay.backoff-multiplier: 2.0 #指数restart-strategy.exponential-delay.reset-backoff-threshold: 10 min #重置重启时间restart-strategy.exponential-delay.jitter-factor: 0.1#重启因子,抖动因子
4.5 小结

工作中常用的重启策略:固定延迟重启策略,推荐使用失败率重启策略

重启策略中重启次数并不是设置的越多越好,一般推荐3-5次。

5、状态后端State Backend

用来保存Flink中State和Checkpoint的数据。

分类:

  • 内存状态后端:默认,一般在开发或者测试中使用
  • 文件系统状态后端:经常在生产环境使用,是用来存储Checkpoint数据
  • RocksDB状态后端:经常在生产环境使用,是用来存储State数据

同时,生产中一般都是文件系统状态后端和RocksDB状态后端一起配合使用。

5.1 内存状态后端MemoryStateBackend

这是Flink中默认的状态后端。

Flink-Checkpoint机制详解:(第41天)_flink checkpoint机制

代码中配置:

env.set_state_backend(HashMapStateBackend())env.get_checkpoint_config().set_checkpoint_storage(JobManagerCheckpointStorage())

配置文件flink-conf.yaml中配置:

state.backend: hashmapstate.checkpoint-storage: jobmanager

内存状态后端,由于数据不安全,一般不用。

5.2 文件系统状态后端FsStateBackend

Flink-Checkpoint机制详解:(第41天)_flink checkpoint机制

代码中配置:

env.set_state_backend(HashMapStateBackend())env.get_checkpoint_config().set_checkpoint_storage_dir(\"file:///checkpoint-dir\")

配置文件flink-conf.yaml中的配置:

state.backend: hashmapstate.checkpoints.dir: file:///checkpoint-dir/state.checkpoint-storage: filesystem

文件系统状态后端一般工作中常用。

5.3 RocksDB数据库状态后端RocksDBStateBackend

Flink-Checkpoint机制详解:(第41天)_flink checkpoint机制

代码中配置:

env.set_state_backend(EmbeddedRocksDBStateBackend())env.get_checkpoint_config().set_checkpoint_storage_dir(\"file:///checkpoint-dir\")

配置文件flink-conf.yaml中的配置:

state.backend: rocksdbstate.checkpoints.dir: file:///checkpoint-dir/state.checkpoint-storage: filesystem

6、开源Flink案例

Checkpoint的配置一般都是固定不变的,可以配置在flink-conf.yaml文件中,这样配置完后,对所有任务都生效,如下:

建议操作前先保存一个node1的虚拟机快照,下面的操作,全部都在node1执行:

# 1.创建HDFS路径hdfs dfs -mkdir /checkpoints# 2.修改Flink配置文件cd /export/server/flink/confvim flink-conf.yaml# 3.要添加的内容如下注意: 配置文件中的配置项前面不要有#
execution.checkpointing.interval: 5000#设置有且仅有一次模式 目前支持 EXACTLY_ONCE、AT_LEAST_ONCE execution.checkpointing.mode: EXACTLY_ONCEstate.backend: hashmap#设置checkpoint的存储方式state.checkpoint-storage: filesystem#设置checkpoint的存储位置state.checkpoints.dir: hdfs://node1:8020/checkpoints#设置savepoint的存储位置state.savepoints.dir: hdfs://node1:8020/checkpoints#设置checkpoint的超时时间 即一次checkpoint必须在该时间内完成 不然就丢弃execution.checkpointing.timeout: 600000#设置两次checkpoint之间的最小时间间隔execution.checkpointing.min-pause: 500#设置并发checkpoint的数目execution.checkpointing.max-concurrent-checkpoints: 1#开启checkpoints的外部持久化这里设置了清除job时保留checkpoint,默认值时保留一个 假如要保留3个state.checkpoints.num-retained: 3#默认情况下,checkpoint不是持久化的,只用于从故障中恢复作业。当程序被取消时,它们会被删除。但是你可以配置checkpoint被周期性持久化到外部,类似于savepoints。这些外部的checkpoints将它们的元数据输出到外#部持久化存储并且当作业失败时不会自动#清除。这样,如果你的工作失败了,你就会有一个checkpoint来恢复。#ExternalizedCheckpointCleanup模式配置当你取消作业时外部checkpoint会产生什么行为:#RETAIN_ON_CANCELLATION: 当作业被取消时,保留外部的checkpoint。注意,在此情况下,您必须手动清理checkpoint状态。#DELETE_ON_CANCELLATION: 当作业被取消时,删除外部化的checkpoint。只有当作业失败时,检查点状态才可用。execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION#第二种:固定延迟重启策略#设置重启策略restart-strategy: fixed-delay#尝试重启次数restart-strategy.fixed-delay.attempts: 3#两次连续重启的间隔时间restart-strategy.fixed-delay.delay: 10 s
# 4.改完配置以后,需要重启Flink集群。另外需要注意我们使用了HDFS。cd /export/server/flink/bin./stop-cluster.sh./start-cluster.sh

Python代码:

#1.构建流式执行环境from pyflink.common import Typesfrom pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, DataStreamfrom pyflink.table import DataTypesenv = StreamExecutionEnvironment.get_execution_environment()env.set_parallelism(1)# env.set_runtime_mode(RuntimeExecutionMode.STREAMING)#2.数据sourceinput_ds = DataStream(env._j_stream_execution_environment.socketTextStream(\"node1\",9999))#3.数据处理def map_word(word): if word == \"error\": raise ValueError(\"出异常了,程序挂了...\") else: return (word,1)result_ds = input_ds.flat_map(lambda x:x.split(\" \"))\\ .map(lambda word:map_word(word),output_type=Types.TUPLE([Types.STRING(),Types.INT()])).\\ key_by(lambda x:x[0])\\ .reduce(lambda x,y:(x[0],x[1] + y[1]))#4.数据Sinkresult_ds.print()#5.启动流式任务env.execute()

Flink-Checkpoint机制详解:(第41天)_flink checkpoint机制

部署python代码的命令:/export/server/flink/bin/flink run -py /export/software/checkpoint_demo.py
需要在node1上启动ncnc -lk 9999

运行结果截图:

Flink-Checkpoint机制详解:(第41天)_flink checkpoint机制