外部檢查點(Flink 1.2)
在 1.2 之前,在作業終止/取消/持續故障之後保持狀態/保留檢查點的唯一方法是通過手動觸發的儲存點。版本 1.2 引入了持久檢查點。
持久檢查點的行為與常規定期檢查點非常相似,但以下區別除外:
- 他們將後設資料儲存到持久儲存(如儲存點)。
- 當擁有的工作永久失敗時,它們不會被丟棄。此外,它們可以配置為在取消作業時不被丟棄。
因此它與儲存點非常相似; 實際上,儲存點只是帶有更多資訊的外部化檢查點。
重要提示 :目前,Flink 的檢查點協調員僅保留最後一個成功完成的檢查點。這意味著每當新檢查點完成時,最後完成的檢查點將被丟棄。這也適用於外部化檢查點。
組態
儲存有關[外化]檢查點的後設資料的地方是在 flink-conf.yaml
中配置的(並且不能通過程式碼覆蓋):
# path to the externalized checkpoints
state.checkpoints.dir: file:///tmp/flink-backend/ext-checkpoints
請注意,此目錄僅包含還原檢查點所需的檢查點後設資料。實際的檢查點檔案仍儲存在其配置的目錄中(即 state.bachend.fs.checkpointdir
屬性)。
用法
你需要使用流環境的 getCheckpointConfig()
方法在程式碼中顯式啟用外部檢查點:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable regular checkpoints
env.enableCheckpointing(5000); // every 5 sec.
// enable externalized checkpoints
env.getCheckpointConfig()
.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
可用的 ExternalizedCheckpointCleanup
模式是:
RETAIN_ON_CANCELLATION
:最後一個檢查點及其後設資料保留在取消作業上; 事後清理是你的責任。DELETE_ON_CANCELLATION
:取消時刪除最後一個檢查點,這意味著只有在應用程式失敗時才可用。
要從外部化檢查點繼續,請使用儲存點語法。例如:
flink run -s /tmp/flink-backend/ext-checkpoints/savepoint-02d0cf7e02ea app.jar