內建反序列化模式
SimpleStringSchema :SimpleStringSchema 將訊息反序列化為字串。如果你的訊息有金鑰,後者將被忽略。
new FlinkKafkaConsumer09<>(kafkaInputTopic, new SimpleStringSchema(), prop);
JSONDeserializationSchema
JSONDeserializationSchema 使用 jackson 反序列化 json 格式的訊息並返回 com.fasterxml.jackson.databind.node.ObjectNode 物件的流。然後,你可以使用 .get("property") 方法訪問欄位。再次,鍵被忽略。
new FlinkKafkaConsumer09<>(kafkaInputTopic, new JSONDeserializationSchema(), prop);
JSONKeyValueDeserializationSchema
JSONKeyValueDeserializationSchema 與前一個非常相似,但處理帶有 json 編碼鍵和值的訊息。
boolean fetchMetadata = true;
new FlinkKafkaConsumer09<>(kafkaInputTopic, new JSONKeyValueDeserializationSchema(fetchMetadata), properties);
返回的 ObjectNode 包含以下欄位:
key:金鑰中存在的所有欄位value:所有訊息欄位- (可選)
metadata:公開訊息的offset,partition和topic(將true傳遞給建構函式以獲取後設資料)。
例如:
kafka-console-producer --broker-list localhost:9092 --topic json-topic \
--property parse.key=true \
--property key.separator=|
{"keyField1": 1, "keyField2": 2} | {"valueField1": 1, "valueField2" : {"foo": "bar"}}
^C
將解碼為:
{
"key":{"keyField1":1,"keyField2":2},
"value":{"valueField1":1,"valueField2":{"foo":"bar"}},
"metadata":{
"offset":43,
"topic":"json-topic",
"partition":0
}
}