KafkaConsumer 的例子

FlinkKafkaConsumer 讓你使用一個或多個 kafka 主題的資料。

版本

要使用的消費者取決於你的 kafka 分佈。

  • FlinkKafkaConsumer08:使用 Kafka 的舊 SimpleConsumer API。抵消由 Flink 處理並提交給 zookeeper。
  • FlinkKafkaConsumer09:使用 Kafka 的新 Consumer API,它自動處理偏移和重新平衡。
  • FlinkKafkaProducer010:此聯結器支援帶有時間戳的 Kafka 訊息,用於生成和使用(對視窗操作很有用)。

用法

二進位制檔案不是 flink 核心的一部分,因此你需要匯入它們:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-0.${kafka.version}_2.10</artifactId>
  <version>RELEASE</version>
</dependency>

建構函式有三個引數:

  • 要讀取的一個或多個主題
  • 反序列化模式告訴 Flink 如何解釋/解碼訊息
  • kafka 使用者配置屬性。這些與常規卡夫卡消費者相同。最低要求是:
    • bootstrap.servers:以 ip:port 形式的逗號分隔的 Kafka 代理列表。對於版本 8,請改用 zookeeper.connect(zookeeper 伺服器列表)
    • group.id:消費者群體的 ID(有關詳細資訊,請參閱 kafka 文件)

在 Java 中:

Properties properties = new Properties();
properties.put("group.id", "flink-kafka-example");
properties.put("bootstrap.servers", "localhost:9092");

DataStream<String> inputStream = env.addSource( 
        new FlinkKafkaConsumer09<>(
            kafkaInputTopic, new SimpleStringSchema(), properties));

在 scala 中:

val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

inputStream = env.addSource(
        new FlinkKafkaConsumer08[String](
            "topic", new SimpleStringSchema(), properties))

在開發過程中,你可以使用 kafka 屬性 enable.auto.commit=falseauto.offset.reset=earliest 在每次啟動你的 pogram 時重新生成相同的資料。