针对不同场景,Flink提供了Checkpoint和Savepoint两种容错机制。
本文总结Checkpoint和Savepoint的使用。
Checkpoint
Checkpoint存储状态数据,由Flink自己定期触发和清除,轻量快速,主要应用于作业生命周期内的故障恢复。
Checkpoint配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
# 1、开启Checkpoint # 默认情况下,不开启Checkpoint # 设置Checkpoint间隔(单位毫秒)大于0,即开启Checkpoint # 如果State比较大,建议增大该值 checkpointConfig.setCheckpointInterval(10L * 1000);
# 2、设置Checkpoint状态管理器 # 默认MemoryStateBackend 支持MemoryStateBackend、FsStateBackend、RocksDBStateBackend三种 # MemoryStateBackend: 基于内存的状态管理器,状态存储在JVM堆内存中。一般不应用于生产。 # FsStateBackend: 基于文件系统的状态管理器,文件系统可以是本地文件系统,或者是HDFS分布式文件系统。 # RocksDBStateBackend: 基于RocksDB的状态管理器,需要引入相关依赖才可使用。 # true: 是否异步 env.setStateBackend((StateBackend) new FsStateBackend("CheckpointDir", true));
# 3、设置Checkpoint语义 # EXACTLY_ONCE: 准确一次,结果不丢不重 # AT_LEAST_ONCE: 至少一次,结果可能会重复 # 注意: 如果要实现端到端的准确一次性语义(End-To-End EXACTLY_ONCE),除了这里设置EXACTLY_ONCE语义外,也需要Source和Sink支持EXACTLY_ONCE checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
# 4、任务取消后保留Checkpoint目录 checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
# 5、设置要保留的Checkpoint数量 # 在conf/flink-conf.yaml中设置 # 默认是1,只保留最新的一份Checkpoint # 如果需要从历史某个时刻恢复,这个参数很有用,可以根据Checkpoint间隔,设置成多个 state.checkpoints.num-retained=20
# 6、设置Checkpoint超时时间 # Checkpoint超时时间,默认10分钟。当Checkpoint执行时间超过该值,Flink会丢弃此次Checkpoint并标记为失败,可从Flink WebUI Checkpoints上看到 checkpointConfig.setCheckpointTimeout(long checkpointTimeout);
# 7、设置Checkpoint之间的最小间隔 # 两次Checkpoint之间的最小间隔,默认是0,单位毫秒。State太大,Checkpoint时间太长,而间隔又很短,则会导致大量Checkpoint任务积压,占用大量计算资源,进而影响任务性能 checkpointConfig.setMinPauseBetweenCheckpoints(30000);
# 8、设置同一时间点最多进行Checkpoint的数量,默认是1个 checkpointConfig.setMaxConcurrentCheckpoints(1);
|
Checkpoint目录结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| # 以FsStateBackend为例,查看Checkpoint的目录结构
hdfs dfs -get /data/flink/checkpoint/c6b137944167dd0db822a3b961c25b64
tree c6b137944167dd0db822a3b961c25b64/ c6b137944167dd0db822a3b961c25b64/ ├── chk-30 │ ├── 1849dc63-05ee-4990-845d-2da9416de805 │ ├── 1e78cdef-fff9-4e19-af0e-c0cc6fc687d3 │ ├── b47f5fd5-fcc1-48d7-9634-f0aa91f03662 │ ├── c7933c2d-fcdb-48a3-bcd2-6d7200ec3c7d │ ├── c86bd78e-9f9e-4dcc-8111-5b86679f9c9c │ ├── f3617a4e-c084-4632-8517-1a1d5b09ae08 │ ├── fe39aebb-5c52-49f5-905a-32a8601fd8f5 │ └── _metadata ├── chk-31 │ ├── 2c142723-5758-4286-85ab-72cd7788391d │ ├── 45e7c99a-63db-4373-9f9b-7f072e3267ac │ ├── 8cf1b3be-5f40-41a7-acba-f0dbe882f27f │ ├── b1f8264c-b101-4f19-b22d-6c809a7de439 │ ├── be82e45d-832a-4e46-abb4-d67077fbabd8 │ ├── cdc6e132-794c-44bd-a79d-2bfe72dbfadb │ ├── da0b4742-d272-45f2-a86a-fdf3aec86e08 │ └── _metadata ├── shared └── taskowned
# taskowned: TaskManagers拥有的状态 # shared: 共享的状态 # chk-30/chk-31: 存储checkpoint的元数据和数据 # 如: chk-30中的文件 # chk-30/_metadata: checkpoint的元数据文件,保存着checkpoint数据文件的路径 # chk-30/1849dc63-05ee-4990-845d-2da9416de805 ...: checkpoint的数据文件,主要存储各种状态,如KafkaConsumer(存储着各分区Offset状态)、如KafkaProducer(存储着事务状态)
|
从Checkpoint恢复
任务取消,任务失败,或需要从历史某个时刻重跑数据,即可用fromSavepoint从某个Checkpoint中恢复状态。
以新的YarnApplication从Checkpoint恢复
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| applicationName=bigdata_flink
/data/software/flink-1.8.0/bin/flink run \ --jobmanager yarn-cluster \ --yarnname ${applicationName} \ --yarnqueue default \ --yarnjobManagerMemory 1024 \ --yarntaskManagerMemory 1024 \ --yarncontainer 2 \ --yarnslots 2 \ --parallelism 2 \ --fromSavepoint hdfs://bigdata-cluster:8020/data/flink/checkpoint/c6b137944167dd0db822a3b961c25b64/chk-252 \ --class com.bigdata.flink.ReadWriteKafka \ bigData-1.0-SNAPSHOT.jar \ --applicationProperties application.properties
|
在老的YarnApplication中从Checkpoint恢复
1 2 3 4 5 6 7 8
| /data/software/flink-1.8.0/bin/flink run \ --yarnapplicationId application_1559561472125_0021 \ --fromSavepoint hdfs://bigdata-cluster:8020/data/flink/checkpoint/c6b137944167dd0db822a3b961c25b64/chk-252 \
--allowNonRestoredState \ --class com.bigdata.flink.ReadWriteKafka \ bigData-1.0-SNAPSHOT.jar \ --applicationProperties application.properties
|
Savepoint
Savepoint是基于Checkpoint机制实现的一种特殊的Checkpoint,可将作业的状态全量快照至外部存储,需手动触发,且不会自动清除,主要应用于作业不同版本间、Flink不同版本间任务恢复。
配置Operator ID
默认,Operator ID通过JobGraph和Hash Operator的特定属性值来生成。对JobGraph的更改(如,交换Operator位置、修改并行度、修改代码逻辑等)可能会导致产生新的Operator ID。不利于作业从保存的状态恢复,这时可以手动设置uid来唯一标识Operator ID。
1 2 3 4 5
| DataStream<String> source = env .addSource(kafkaConsumer) .name("KafkaSource") # 设置Operator ID .uid("source-id");
|
触发Savepoint
手动触发Savepoint
对于以Yarn Per Job模式运行的Flink作业,可用如下命令触发Savepoint
1 2 3 4
| 命令: bin/flink savepoint --yarnapplicationId yarnAppId flinkJobId savepointTargetDirectory
举例: bin/flink savepoint --yarnapplicationId application_1559561472125_0020 d904f6adf8a3e94d70465fc717a3f30b hdfs://bigdata-cluster:8020/data/flink/savepoint/
|
取消任务并触发Savepoint
对于以Yarn Per Job模式运行的Flink作业,可用如下命令取消并触发Savepoint
1 2 3 4
| 命令: bin/flink cancel --yarnapplicationId yarnAppId --withSavepoint savepointTargetDirectory flinkJobId
举例: bin/flink cancel --yarnapplicationId application_1559561472125_0020 --withSavepoint hdfs://bigdata-cluster:8020/data/flink/savepoint/ d904f6adf8a3e94d70465fc717a3f30b
|
Savepoint目录结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| hdfs dfs -get hdfs://bigdata-cluster:8020/data/flink/savepoint/savepoint-f793f8-3d4e669257fa/ ./
tree savepoint-f793f8-3d4e669257fa/ savepoint-f793f8-3d4e669257fa/ ├── 03ea1c3a-9243-45ba-8613-43911e34e007 ├── 380c016c-b0b1-4a92-a3cd-36e7503b0f48 ├── 77241d6d-5c34-49b1-97dc-79edeeb217b9 ├── 83aa014c-dff8-490e-b132-227fb89c84eb ├── 9696f63c-3d88-4066-9b13-d479dc2ff26e ├── d02828fc-1591-4241-9cda-adba649d1cef ├── da749c45-db28-4a66-8876-67b84c3240e9 └── _metadata
|
从Savepoint恢复
以新的YarnApplication从Savepoint恢复
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| applicationName=bigdata_flink
/data/software/flink-1.8.0/bin/flink run \ --jobmanager yarn-cluster \ --yarnname ${applicationName} \ --yarnqueue default \ --yarnjobManagerMemory 1024 \ --yarntaskManagerMemory 1024 \ --yarncontainer 2 \ --yarnslots 2 \ --parallelism 2 \ --fromSavepoint hdfs://bigdata-cluster:8020/data/flink/savepoint/savepoint-d904f6-d06500edc454 \ --class com.bigdata.flink.ReadWriteKafka \ bigData-1.0-SNAPSHOT.jar \ --applicationProperties application.properties
|
在老的YarnApplication中从Savepoint恢复
1 2 3 4 5 6 7 8
| /data/software/flink-1.8.0/bin/flink run \ --yarnapplicationId application_1559561472125_0021 \ --fromSavepoint hdfs://bigdata-cluster:8020/data/flink/savepoint/savepoint-f793f8-3d4e669257fa \
--allowNonRestoredState \ --class com.bigdata.flink.ReadWriteKafka \ bigData-1.0-SNAPSHOT.jar \ --applicationProperties application.properties
|
删除Savepoint
1 2 3
| 命令: /data/software/flink-1.8.0/bin/flink savepoint --dispose savepointDirectory
举例: /data/software/flink-1.8.0/bin/flink savepoint --dispose hdfs://bigdata-cluster:8020/data/flink/savepoint/savepoint-f793f8-3d4e669257fa
|