測試檢查點
程式碼
這是一個簡單的 flink 應用程式,它使用具有 Integer
託管狀態的有狀態對映器。你可以使用 checkpointEnable
,checkpointInterval
和 checkpointMode
變數來檢視它們的效果:
public class CheckpointExample {
private static Logger LOG = LoggerFactory.getLogger(CheckpointExample.class);
private static final String KAFKA_BROKER = "localhost:9092";
private static final String KAFKA_INPUT_TOPIC = "input-topic";
private static final String KAFKA_GROUP_ID = "flink-stackoverflow-checkpointer";
private static final String CLASS_NAME = CheckpointExample.class.getSimpleName();
public static void main(String[] args) throws Exception {
// play with them
boolean checkpointEnable = false;
long checkpointInterval = 1000;
CheckpointingMode checkpointMode = CheckpointingMode.EXACTLY_ONCE;
// ----------------------------------------------------
LOG.info(CLASS_NAME + ": starting...");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// kafka source
// https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html#kafka-consumer
Properties prop = new Properties();
prop.put("bootstrap.servers", KAFKA_BROKER);
prop.put("group.id", KAFKA_GROUP_ID);
prop.put("auto.offset.reset", "latest");
prop.put("enable.auto.commit", "false");
FlinkKafkaConsumer09<String> source = new FlinkKafkaConsumer09<>(
KAFKA_INPUT_TOPIC, new SimpleStringSchema(), prop);
// checkpoints
// internals: https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html#checkpointing
// config: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/checkpointing.html
if (checkpointEnable) env.enableCheckpointing(checkpointInterval, checkpointMode);
env
.addSource(source)
.keyBy((any) -> 1)
.flatMap(new StatefulMapper())
.print();
env.execute(CLASS_NAME);
}
/* *****************************************************************
* Stateful mapper
* (cf. https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html)
* ****************************************************************/
public static class StatefulMapper extends RichFlatMapFunction<String, String> {
private transient ValueState<Integer> state;
@Override
public void flatMap(String record, Collector<String> collector) throws Exception {
// access the state value
Integer currentState = state.value();
// update the counts
currentState += 1;
collector.collect(String.format("%s: (%s,%d)",
LocalDateTime.now().format(ISO_LOCAL_DATE_TIME), record, currentState));
// update the state
state.update(currentState);
}
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Integer> descriptor =
new ValueStateDescriptor<>("CheckpointExample", TypeInformation.of(Integer.class), 0);
state = getRuntimeContext().getState(descriptor);
}
}
}
執行示例並模擬失敗
為了能夠檢查檢查點,你需要啟動一個 cluster
。更簡單的方法是在 flink/bin
目錄中使用 start-cluster.sh
指令碼:
start-cluster.sh
Starting cluster.
[INFO] 1 instance(s) of jobmanager are already running on virusnest.
Starting jobmanager daemon on host virusnest.
Password:
Starting taskmanager daemon on host virusnest.
現在,打包你的應用並將其提交給 flink:
mvn clean package
flink run target/flink-checkpoints-test.jar -c CheckpointExample
建立一些資料:
kafka-console-producer --broker-list localhost:9092 --topic input-topic
a
b
c
^D
輸出應該在 flink/logs/flink-<user>-jobmanager-0-<host>.out
中提供。例如:
tail -f flink/logs/flink-Derlin-jobmanager-0-virusnest.out
2017-03-17T08:21:51.249: (a,1)
2017-03-17T08:21:51.545: (b,2)
2017-03-17T08:21:52.363: (c,3)
要測試檢查點,只需終止工作管理員(這將模擬失敗),生成一些資料並啟動一個新資料:
# killing the taskmanager
ps -ef | grep -i taskmanager
kill <taskmanager PID>
# starting a new taskmanager
flink/bin/taskmanager.sh start
注意: 當啟動一個新的 taskmanager 時,它將使用另一個日誌檔案,即 flink/logs/flink-<user>-jobmanager-1-<host>.out
(注意整數增量)。
會發生什麼
- 禁用檢查點 :如果你在失敗期間生成資料,它們肯定會丟失。但令人驚訝的是,櫃檯將是正確的!
- 檢查點已啟用 :不再有資料丟失(以及正確的計數器)。
- 具有至少一次模式的檢查點 :你可能會看到重複項,特別是如果你將檢查點間隔設定為較大的數字並多次終止工作管理員