配置和设置
检查点配置分两步完成。首先,你需要选择后端。然后,你可以在每个应用程序的基础上指定检查点的间隔和模式。
后端
可用的后端
存储检查点的位置取决于配置的后端:
MemoryStateBackend
:内存状态,备份到 JobManager 的/ ZooKeeper 的内存。应仅用于最小状态(默认为最大 5 MB,例如用于存储 Kafka 偏移)或测试和本地调试。FsStateBackend
:状态保存在 TaskManagers 的内存中,状态快照(即检查点)存储在文件系统(HDFS,DS3,本地文件系统……)中。鼓励对大状态或长窗口以及高可用性设置进行此设置。RocksDBStateBackend
:保存 RocksDB 数据库中的飞行中数据,该数据库(默认情况下)存储在 TaskManager 数据目录中。在检查点时,整个 RocksDB 数据库被写入文件(如上所述)。与 FsStateBackend 相比,它允许更大的状态(仅受磁盘空间与任务管理器内存大小的限制),但吞吐量将更低(数据不总是在内存中,必须从光盘加载)。
请注意,无论后端如何,元数据(检查点数量,本地化等)始终存储在作业管理器内存中,并且在应用程序终止/取消后检查点将不会保留。
指定后端
你可以使用以下命令在程序的 main
方法中指定后端:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
或者在 flink/conf/flink-conf.yaml
中设置默认后端:
# Supported backends:
# - jobmanager (MemoryStateBackend),
# - filesystem (FsStateBackend),
# - rocksdb (RocksDBStateBackend),
# - <class-name-of-factory>
state.backend: filesystem
# Directory for storing checkpoints in a Flink-supported filesystem
# Note: State backend must be accessible from the JobManager and all TaskManagers.
# Use "hdfs://" for HDFS setups, "file://" for UNIX/POSIX-compliant file systems,
# "S3://" for S3 file system.
state.backend.fs.checkpointdir: file:///tmp/flink-backend/checkpoints
启用检查点
每个应用程序都需要明确启用检查点:
long checkpointInterval = 5000; // every 5 seconds
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(checkpointInterval);
你可以选择指定检查点模式。如果没有,则默认为一次 :
env.enableCheckpointing(checkpointInterval, CheckpointingMode.AT_LEAST_ONCE);
检查点模式定义了系统在出现故障时提供的一致性。当激活检查点时,重放数据流,以便重复丢失的处理部分。使用 EXACTLY_ONCE
,系统绘制检查点,使得恢复的行为就像操作符/函数恰好一次看到每条记录一样。使用 AT_LEAST_ONCE
,检查点以更简单的方式绘制,通常在恢复时遇到一些重复。