Gson(de) 序列化器
此示例使用 gson 库将 java 对象映射到 json 字符串。 (de)序列化器是通用的,但它们并不总是需要!
串行
码
public class GsonSerializer<T> implements Serializer<T> {
private Gson gson = new GsonBuilder().create();
@Override
public void configure(Map<String, ?> config, boolean isKey) {
// this is called right after construction
// use it for initialisation
}
@Override
public byte[] serialize(String s, T t) {
return gson.toJson(t).getBytes();
}
@Override
public void close() {
// this is called right before destruction
}
}
用法
序列化程序通过所需的 key.serializer
和 value.serializer
生产者属性定义。
假设我们有一个名为 SensorValue
的 POJO 类,我们想要生成没有任何键的消息(键设置为 null
):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other producer properties ...
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", GsonSerializer.class.getName());
Producer<String, SensorValue> producer = new KafkaProducer<>(properties);
// ... produce messages ...
producer.close();
(key.serializer
是必需的配置。由于我们没有指定消息密钥,我们保留 StringSerializer
随附 kafka,它能够处理 null
)。
解串器
码
public class GsonDeserializer<T> implements Deserializer<T> {
public static final String CONFIG_VALUE_CLASS = "value.deserializer.class";
public static final String CONFIG_KEY_CLASS = "key.deserializer.class";
private Class<T> cls;
private Gson gson = new GsonBuilder().create();
@Override
public void configure(Map<String, ?> config, boolean isKey) {
String configKey = isKey ? CONFIG_KEY_CLASS : CONFIG_VALUE_CLASS;
String clsName = String.valueOf(config.get(configKey));
try {
cls = (Class<T>) Class.forName(clsName);
} catch (ClassNotFoundException e) {
System.err.printf("Failed to configure GsonDeserializer. " +
"Did you forget to specify the '%s' property ?%n",
configKey);
}
}
@Override
public T deserialize(String topic, byte[] bytes) {
return (T) gson.fromJson(new String(bytes), cls);
}
@Override
public void close() {}
}
用法
反序列化器通过所需的 key.deserializer
和 value.deserializer
消费者属性来定义。
假设我们有一个名为 SensorValue
的 POJO 类,我们想要生成没有任何键的消息(键设置为 null
):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other consumer properties ...
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", GsonDeserializer.class.getName());
props.put(GsonDeserializer.CONFIG_VALUE_CLASS, SensorValue.class.getName());
try (KafkaConsumer<String, SensorValue> consumer = new KafkaConsumer<>(props)) {
// ... consume messages ...
}
在这里,我们为消费者配置添加一个自定义属性,即 CONFIG_VALUE_CLASS
。GsonDeserializer
将在 configure()
方法中使用它来确定它应该处理的 POJO 类(添加到 props
的所有属性将以地图的形式传递给 configure
方法)。