內建反序列化模式
SimpleStringSchema :SimpleStringSchema
將訊息反序列化為字串。如果你的訊息有金鑰,後者將被忽略。
new FlinkKafkaConsumer09<>(kafkaInputTopic, new SimpleStringSchema(), prop);
JSONDeserializationSchema
JSONDeserializationSchema
使用 jackson 反序列化 json 格式的訊息並返回 com.fasterxml.jackson.databind.node.ObjectNode
物件的流。然後,你可以使用 .get("property")
方法訪問欄位。再次,鍵被忽略。
new FlinkKafkaConsumer09<>(kafkaInputTopic, new JSONDeserializationSchema(), prop);
JSONKeyValueDeserializationSchema
JSONKeyValueDeserializationSchema
與前一個非常相似,但處理帶有 json 編碼鍵和值的訊息。
boolean fetchMetadata = true;
new FlinkKafkaConsumer09<>(kafkaInputTopic, new JSONKeyValueDeserializationSchema(fetchMetadata), properties);
返回的 ObjectNode
包含以下欄位:
key
:金鑰中存在的所有欄位value
:所有訊息欄位- (可選)
metadata
:公開訊息的offset
,partition
和topic
(將true
傳遞給建構函式以獲取後設資料)。
例如:
kafka-console-producer --broker-list localhost:9092 --topic json-topic \
--property parse.key=true \
--property key.separator=|
{"keyField1": 1, "keyField2": 2} | {"valueField1": 1, "valueField2" : {"foo": "bar"}}
^C
將解碼為:
{
"key":{"keyField1":1,"keyField2":2},
"value":{"valueField1":1,"valueField2":{"foo":"bar"}},
"metadata":{
"offset":43,
"topic":"json-topic",
"partition":0
}
}