KafkaConsumer 的例子
FlinkKafkaConsumer
讓你使用一個或多個 kafka 主題的資料。
版本
要使用的消費者取決於你的 kafka 分佈。
FlinkKafkaConsumer08
:使用 Kafka 的舊SimpleConsumer
API。抵消由 Flink 處理並提交給 zookeeper。FlinkKafkaConsumer09
:使用 Kafka 的新 Consumer API,它自動處理偏移和重新平衡。FlinkKafkaProducer010
:此聯結器支援帶有時間戳的 Kafka 訊息,用於生成和使用(對視窗操作很有用)。
用法
二進位制檔案不是 flink 核心的一部分,因此你需要匯入它們:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.${kafka.version}_2.10</artifactId>
<version>RELEASE</version>
</dependency>
建構函式有三個引數:
- 要讀取的一個或多個主題
- 反序列化模式告訴 Flink 如何解釋/解碼訊息
- kafka 使用者配置屬性。這些與常規卡夫卡消費者相同。最低要求是:
bootstrap.servers
:以 ip:port 形式的逗號分隔的 Kafka 代理列表。對於版本 8,請改用zookeeper.connect
(zookeeper 伺服器列表)group.id
:消費者群體的 ID(有關詳細資訊,請參閱 kafka 文件)
在 Java 中:
Properties properties = new Properties();
properties.put("group.id", "flink-kafka-example");
properties.put("bootstrap.servers", "localhost:9092");
DataStream<String> inputStream = env.addSource(
new FlinkKafkaConsumer09<>(
kafkaInputTopic, new SimpleStringSchema(), properties));
在 scala 中:
val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
inputStream = env.addSource(
new FlinkKafkaConsumer08[String](
"topic", new SimpleStringSchema(), properties))
在開發過程中,你可以使用 kafka 屬性 enable.auto.commit=false
和 auto.offset.reset=earliest
在每次啟動你的 pogram 時重新生成相同的資料。