儲存點

組態

配置在檔案 flink/conf/flink-conf.yaml 中(在 Mac OSX 下通過自制程式,它是/usr/local/Cellar/apache-flink/1.1.3/libexec/conf/flink-conf.yaml)。

Flink <1.2 :配置與檢查點配置非常相似(可用主題)。唯一的區別是定義記憶體中的儲存點後端是沒有意義的,因為我們需要儲存點在 Flink 關閉後保持不變。

# Supported backends: filesystem, <class-name-of-factory>
savepoints.state.backend: filesystem
# Use "hdfs://" for HDFS setups, "file://" for UNIX/POSIX-compliant file systems,
# (or any local file system under Windows), or "S3://" for S3 file system.
# Note: must be accessible from the JobManager and all TaskManagers !
savepoints.state.backend.fs.checkpointdir: file:///tmp/flink-backend/savepoints

注意 :如果未指定後端,則預設後端為 jobmanager ,這意味著一旦群集關閉,你的儲存點將消失。這僅適用於除錯。

Flink 1.2+ :正如本 jira 票中所解釋的那樣,允許將儲存點儲存在 jobmanager 的記憶體中毫無意義。從 Flink 1.2 開始,儲存點必然儲存在檔案中。以上配置已被替換為:

# Default savepoint target directory
state.savepoints.dir: hdfs:///flink/savepoints

用法

獲得工作 ID

要觸發儲存點,你只需要應用程式的作業 ID。啟動作業時,作業 ID 將在命令列中列印,或者稍後可以使用 flink list 檢索作業 ID:

flink list
Retrieving JobManager.
Using address localhost/127.0.0.1:6123 to connect to JobManager.
------------------ Running/Restarting Jobs -------------------
17.03.2017 11:44:03 : 196b8ce6788d0554f524ba747c4ea54f : CheckpointExample (RUNNING)No scheduled jobs.

觸發儲存點

要觸發儲存點,請使用 flink savepoint <jobID>

flink savepoint 196b8ce6788d0554f524ba747c4ea54f
Retrieving JobManager.
Using address /127.0.0.1:6123 to connect to JobManager.
Triggering savepoint for job 196b8ce6788d0554f524ba747c4ea54f.
Waiting for response...
Savepoint completed. Path: file:/tmp/flink-backend/savepoints/savepoint-a40111f915fc
You can resume your program from this savepoint with the run command.

請注意,你還可以提供目標目錄作為第二個引數,它將覆蓋 flink/bin/flink-conf.yaml 中定義的預設目錄。

在 Flink 1.2+中,還可以使用 -s 選項取消作業並同時執行儲存點:

flink cancel -s 196b8ce6788d0554f524ba747c4ea54f # use default savepoints dir
flink cancel -s hdfs:///savepoints 196b8ce6788d0554f524ba747c4ea54f # specify target dir

注意 :可以移動儲存點,但不要重新命名它!

從儲存點恢復

要從特定儲存點恢復,請使用 flink run 命令的 -s [savepoint-dir] 選項:

flink run -s /tmp/flink-backend/savepoints/savepoint-a40111f915fc app.jar

指定運算子 UID

為了能夠在程式碼更改後從儲存點恢復,你必須確保新程式碼對運算子使用相同的 UID。要手動分配 UID,請在運算子之後立即呼叫 .uid(<name>) fonction:

env
    .addSource(source)
    .uid(className + "-KafkaSource01")
    .rebalance()
    .keyBy((node) -> node.get("key").asInt())
    .flatMap(new StatefulMapper())
    .uid(className + "-StatefulMapper01")
    .print();