儲存點
組態
配置在檔案 flink/conf/flink-conf.yaml
中(在 Mac OSX 下通過自制程式,它是/usr/local/Cellar/apache-flink/1.1.3/libexec/conf/flink-conf.yaml
)。
Flink <1.2 :配置與檢查點配置非常相似(可用主題)。唯一的區別是定義記憶體中的儲存點後端是沒有意義的,因為我們需要儲存點在 Flink 關閉後保持不變。
# Supported backends: filesystem, <class-name-of-factory>
savepoints.state.backend: filesystem
# Use "hdfs://" for HDFS setups, "file://" for UNIX/POSIX-compliant file systems,
# (or any local file system under Windows), or "S3://" for S3 file system.
# Note: must be accessible from the JobManager and all TaskManagers !
savepoints.state.backend.fs.checkpointdir: file:///tmp/flink-backend/savepoints
注意 :如果未指定後端,則預設後端為 jobmanager ,這意味著一旦群集關閉,你的儲存點將消失。這僅適用於除錯。
Flink 1.2+ :正如本 jira 票中所解釋的那樣,允許將儲存點儲存在 jobmanager 的記憶體中毫無意義。從 Flink 1.2 開始,儲存點必然儲存在檔案中。以上配置已被替換為:
# Default savepoint target directory
state.savepoints.dir: hdfs:///flink/savepoints
用法
獲得工作 ID
要觸發儲存點,你只需要應用程式的作業 ID。啟動作業時,作業 ID 將在命令列中列印,或者稍後可以使用 flink list
檢索作業 ID:
flink list
Retrieving JobManager.
Using address localhost/127.0.0.1:6123 to connect to JobManager.
------------------ Running/Restarting Jobs -------------------
17.03.2017 11:44:03 : 196b8ce6788d0554f524ba747c4ea54f : CheckpointExample (RUNNING)No scheduled jobs.
觸發儲存點
要觸發儲存點,請使用 flink savepoint <jobID>
:
flink savepoint 196b8ce6788d0554f524ba747c4ea54f
Retrieving JobManager.
Using address /127.0.0.1:6123 to connect to JobManager.
Triggering savepoint for job 196b8ce6788d0554f524ba747c4ea54f.
Waiting for response...
Savepoint completed. Path: file:/tmp/flink-backend/savepoints/savepoint-a40111f915fc
You can resume your program from this savepoint with the run command.
請注意,你還可以提供目標目錄作為第二個引數,它將覆蓋 flink/bin/flink-conf.yaml
中定義的預設目錄。
在 Flink 1.2+中,還可以使用 -s
選項取消作業並同時執行儲存點:
flink cancel -s 196b8ce6788d0554f524ba747c4ea54f # use default savepoints dir
flink cancel -s hdfs:///savepoints 196b8ce6788d0554f524ba747c4ea54f # specify target dir
注意 :可以移動儲存點,但不要重新命名它!
從儲存點恢復
要從特定儲存點恢復,請使用 flink run
命令的 -s [savepoint-dir]
選項:
flink run -s /tmp/flink-backend/savepoints/savepoint-a40111f915fc app.jar
指定運算子 UID
為了能夠在程式碼更改後從儲存點恢復,你必須確保新程式碼對運算子使用相同的 UID。要手動分配 UID,請在運算子之後立即呼叫 .uid(<name>)
fonction:
env
.addSource(source)
.uid(className + "-KafkaSource01")
.rebalance()
.keyBy((node) -> node.get("key").asInt())
.flatMap(new StatefulMapper())
.uid(className + "-StatefulMapper01")
.print();