KafkaConsumer 的例子
FlinkKafkaConsumer 让你使用一个或多个 kafka 主题的数据。
版本
要使用的消费者取决于你的 kafka 分布。
- FlinkKafkaConsumer08:使用 Kafka 的旧- SimpleConsumerAPI。抵消由 Flink 处理并提交给 zookeeper。
- FlinkKafkaConsumer09:使用 Kafka 的新 Consumer API,它自动处理偏移和重新平衡。
- FlinkKafkaProducer010:此连接器支持带有时间戳的 Kafka 消息,用于生成和使用(对窗口操作很有用)。
用法
二进制文件不是 flink 核心的一部分,因此你需要导入它们:
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-0.${kafka.version}_2.10</artifactId>
  <version>RELEASE</version>
</dependency>
构造函数有三个参数:
- 要读取的一个或多个主题
- 反序列化模式告诉 Flink 如何解释/解码消息
- kafka 使用者配置属性。这些与常规卡夫卡消费者相同。最低要求是:
- bootstrap.servers:以 ip:port 形式的逗号分隔的 Kafka 代理列表。对于版本 8,请改用- zookeeper.connect(zookeeper 服务器列表)
- group.id:消费者群体的 ID(有关详细信息,请参阅 kafka 文档)
 
在 Java 中:
Properties properties = new Properties();
properties.put("group.id", "flink-kafka-example");
properties.put("bootstrap.servers", "localhost:9092");
DataStream<String> inputStream = env.addSource( 
        new FlinkKafkaConsumer09<>(
            kafkaInputTopic, new SimpleStringSchema(), properties));
在 scala 中:
val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
inputStream = env.addSource(
        new FlinkKafkaConsumer08[String](
            "topic", new SimpleStringSchema(), properties))
在开发过程中,你可以使用 kafka 属性 enable.auto.commit=false 和 auto.offset.reset=earliest 在每次启动你的 pogram 时重新生成相同的数据。