保存点
组态
配置在文件 flink/conf/flink-conf.yaml
中(在 Mac OSX 下通过自制程序,它是/usr/local/Cellar/apache-flink/1.1.3/libexec/conf/flink-conf.yaml
)。
Flink <1.2 :配置与检查点配置非常相似(可用主题)。唯一的区别是定义内存中的保存点后端是没有意义的,因为我们需要保存点在 Flink 关闭后保持不变。
# Supported backends: filesystem, <class-name-of-factory>
savepoints.state.backend: filesystem
# Use "hdfs://" for HDFS setups, "file://" for UNIX/POSIX-compliant file systems,
# (or any local file system under Windows), or "S3://" for S3 file system.
# Note: must be accessible from the JobManager and all TaskManagers !
savepoints.state.backend.fs.checkpointdir: file:///tmp/flink-backend/savepoints
注意 :如果未指定后端,则默认后端为 jobmanager ,这意味着一旦群集关闭,你的保存点将消失。这仅适用于调试。
Flink 1.2+ :正如本 jira 票中所解释的那样,允许将保存点保存在 jobmanager 的内存中毫无意义。从 Flink 1.2 开始,保存点必然存储在文件中。以上配置已被替换为:
# Default savepoint target directory
state.savepoints.dir: hdfs:///flink/savepoints
用法
获得工作 ID
要触发保存点,你只需要应用程序的作业 ID。启动作业时,作业 ID 将在命令行中打印,或者稍后可以使用 flink list
检索作业 ID:
flink list
Retrieving JobManager.
Using address localhost/127.0.0.1:6123 to connect to JobManager.
------------------ Running/Restarting Jobs -------------------
17.03.2017 11:44:03 : 196b8ce6788d0554f524ba747c4ea54f : CheckpointExample (RUNNING)No scheduled jobs.
触发保存点
要触发保存点,请使用 flink savepoint <jobID>
:
flink savepoint 196b8ce6788d0554f524ba747c4ea54f
Retrieving JobManager.
Using address /127.0.0.1:6123 to connect to JobManager.
Triggering savepoint for job 196b8ce6788d0554f524ba747c4ea54f.
Waiting for response...
Savepoint completed. Path: file:/tmp/flink-backend/savepoints/savepoint-a40111f915fc
You can resume your program from this savepoint with the run command.
请注意,你还可以提供目标目录作为第二个参数,它将覆盖 flink/bin/flink-conf.yaml
中定义的默认目录。
在 Flink 1.2+中,还可以使用 -s
选项取消作业并同时执行保存点:
flink cancel -s 196b8ce6788d0554f524ba747c4ea54f # use default savepoints dir
flink cancel -s hdfs:///savepoints 196b8ce6788d0554f524ba747c4ea54f # specify target dir
注意 :可以移动保存点,但不要重命名它!
从保存点恢复
要从特定保存点恢复,请使用 flink run
命令的 -s [savepoint-dir]
选项:
flink run -s /tmp/flink-backend/savepoints/savepoint-a40111f915fc app.jar
指定运算符 UID
为了能够在代码更改后从保存点恢复,你必须确保新代码对运算符使用相同的 UID。要手动分配 UID,请在运算符之后立即调用 .uid(<name>)
fonction:
env
.addSource(source)
.uid(className + "-KafkaSource01")
.rebalance()
.keyBy((node) -> node.get("key").asInt())
.flatMap(new StatefulMapper())
.uid(className + "-StatefulMapper01")
.print();