内置反序列化模式
SimpleStringSchema :SimpleStringSchema
将消息反序列化为字符串。如果你的消息有密钥,后者将被忽略。
new FlinkKafkaConsumer09<>(kafkaInputTopic, new SimpleStringSchema(), prop);
JSONDeserializationSchema
JSONDeserializationSchema
使用 jackson 反序列化 json 格式的消息并返回 com.fasterxml.jackson.databind.node.ObjectNode
对象的流。然后,你可以使用 .get("property")
方法访问字段。再次,键被忽略。
new FlinkKafkaConsumer09<>(kafkaInputTopic, new JSONDeserializationSchema(), prop);
JSONKeyValueDeserializationSchema
JSONKeyValueDeserializationSchema
与前一个非常相似,但处理带有 json 编码键和值的消息。
boolean fetchMetadata = true;
new FlinkKafkaConsumer09<>(kafkaInputTopic, new JSONKeyValueDeserializationSchema(fetchMetadata), properties);
返回的 ObjectNode
包含以下字段:
key
:密钥中存在的所有字段value
:所有消息字段- (可选)
metadata
:公开消息的offset
,partition
和topic
(将true
传递给构造函数以获取元数据)。
例如:
kafka-console-producer --broker-list localhost:9092 --topic json-topic \
--property parse.key=true \
--property key.separator=|
{"keyField1": 1, "keyField2": 2} | {"valueField1": 1, "valueField2" : {"foo": "bar"}}
^C
将解码为:
{
"key":{"keyField1":1,"keyField2":2},
"value":{"valueField1":1,"valueField2":{"foo":"bar"}},
"metadata":{
"offset":43,
"topic":"json-topic",
"partition":0
}
}