內建反序列化模式

SimpleStringSchemaSimpleStringSchema 將訊息反序列化為字串。如果你的訊息有金鑰,後者將被忽略。

new FlinkKafkaConsumer09<>(kafkaInputTopic, new SimpleStringSchema(), prop);

JSONDeserializationSchema

JSONDeserializationSchema 使用 jackson 反序列化 json 格式的訊息並返回 com.fasterxml.jackson.databind.node.ObjectNode 物件的流。然後,你可以使用 .get("property") 方法訪問欄位。再次,鍵被忽略。

new FlinkKafkaConsumer09<>(kafkaInputTopic, new JSONDeserializationSchema(), prop);

JSONKeyValueDeserializationSchema

JSONKeyValueDeserializationSchema 與前一個非常相似,但處理帶有 json 編碼鍵和值的訊息。

boolean fetchMetadata = true;
new FlinkKafkaConsumer09<>(kafkaInputTopic, new JSONKeyValueDeserializationSchema(fetchMetadata), properties);

返回的 ObjectNode 包含以下欄位:

  • key:金鑰中存在的所有欄位
  • value:所有訊息欄位
  • (可選)metadata:公開訊息的 offsetpartitiontopic(將 true 傳遞給建構函式以獲取後設資料)。

例如:

kafka-console-producer --broker-list localhost:9092 --topic json-topic \
    --property parse.key=true \
    --property key.separator=|
{"keyField1": 1, "keyField2": 2} | {"valueField1": 1, "valueField2" : {"foo": "bar"}}
^C

將解碼為:

{
    "key":{"keyField1":1,"keyField2":2},
    "value":{"valueField1":1,"valueField2":{"foo":"bar"}},
    "metadata":{
        "offset":43,
        "topic":"json-topic",
        "partition":0
    }
}