KafkaConsumer 的例子
FlinkKafkaConsumer
让你使用一个或多个 kafka 主题的数据。
版本
要使用的消费者取决于你的 kafka 分布。
FlinkKafkaConsumer08
:使用 Kafka 的旧SimpleConsumer
API。抵消由 Flink 处理并提交给 zookeeper。FlinkKafkaConsumer09
:使用 Kafka 的新 Consumer API,它自动处理偏移和重新平衡。FlinkKafkaProducer010
:此连接器支持带有时间戳的 Kafka 消息,用于生成和使用(对窗口操作很有用)。
用法
二进制文件不是 flink 核心的一部分,因此你需要导入它们:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.${kafka.version}_2.10</artifactId>
<version>RELEASE</version>
</dependency>
构造函数有三个参数:
- 要读取的一个或多个主题
- 反序列化模式告诉 Flink 如何解释/解码消息
- kafka 使用者配置属性。这些与常规卡夫卡消费者相同。最低要求是:
bootstrap.servers
:以 ip:port 形式的逗号分隔的 Kafka 代理列表。对于版本 8,请改用zookeeper.connect
(zookeeper 服务器列表)group.id
:消费者群体的 ID(有关详细信息,请参阅 kafka 文档)
在 Java 中:
Properties properties = new Properties();
properties.put("group.id", "flink-kafka-example");
properties.put("bootstrap.servers", "localhost:9092");
DataStream<String> inputStream = env.addSource(
new FlinkKafkaConsumer09<>(
kafkaInputTopic, new SimpleStringSchema(), properties));
在 scala 中:
val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
inputStream = env.addSource(
new FlinkKafkaConsumer08[String](
"topic", new SimpleStringSchema(), properties))
在开发过程中,你可以使用 kafka 属性 enable.auto.commit=false
和 auto.offset.reset=earliest
在每次启动你的 pogram 时重新生成相同的数据。