SimpleProducer(kafka 0.9)
配置和初始化
首先,建立一個 maven 專案並在你的 pom 中新增以下依賴項:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
</dependencies>
使用 Properties
物件初始化生成器。有許多屬性允許你微調生產者行為。以下是所需的最小配置:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "simple-producer-XX");
bootstrap-servers
是一個或多個代理的初始列表,供生產者能夠發現群集的其餘部分。serializer
屬性告訴 Kafka 如何編碼訊息鍵和值。在這裡,我們將傳送字串訊息。雖然不是必需的,但始終建議設定 client.id
:這使你可以輕鬆地將代理上的請求與建立它的客戶端例項相關聯。
其他有趣的屬性是:
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
你可以通過 acks
設定控制寫入 Kafka 的訊息的永續性。預設值 1
需要分割槽負責人明確確認寫入成功。Kafka 提供的最強保證是 acks=all
,它保證分割槽領導者不僅接受寫入,而且成功地複製到所有同步副本。你還可以使用值 0
來最大化吞吐量,但是你無法保證訊息已成功寫入代理的日誌,因為在這種情況下代理甚至不傳送響應。
retries
(預設為> 0)確定生產者是否在失敗後嘗試重新傳送訊息。請注意,如果重試次數> 0,則可能會發生訊息重新排序,因為在後續寫入成功後可能會發生重試。
Kafka 生產商嘗試將已傳送的訊息分批收集以提高吞吐量。使用 Java 客戶端,你可以使用 batch.size
來控制每個訊息批處理的最大大小(以位元組為單位)。為了給批量填充更多時間,你可以使用 linger.ms
讓生產者延遲傳送。最後,可以使用 compression.type
設定啟用壓縮。
使用 buffer.memory
限制 Java 客戶端可用於收集未傳送訊息的總記憶體。當達到此限制時,生產者將在提出異常之前阻止其他傳送,直到 max.block.ms
。此外,為避免將記錄無限期排隊,你可以使用 request.timeout.ms
設定超時。
完整的屬性列表可在此處獲得 。我建議你從 Confluent 閱讀這篇文章瞭解更多詳情。
傳送訊息
send()
方法是非同步的。呼叫時,它會將記錄新增到待處理記錄傳送的緩衝區中並立即返回。這允許生產者將各個記錄一起批處理以提高效率。
傳送的結果是 RecordMetadata
,指定記錄傳送到的分割槽以及分配的偏移量。由於傳送呼叫是非同步的,因此它將為將分配給此記錄的 RecordMetadata 返回 Future
。要查閱後設資料,你可以呼叫 get()
,它將阻止直到請求完成或使用回撥。
// synchronous call with get()
RecordMetadata recordMetadata = producer.send( message ).get();
// callback with a lambda
producer.send( message, ( recordMetadata, error ) -> System.out.println(recordMetadata) );
程式碼
public class SimpleProducer{
public static void main( String[] args ) throws ExecutionException, InterruptedException{
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put( "client.id", "octopus" );
String topic = "test-topic";
Producer<String, String> producer = new KafkaProducer<>( props );
for( int i = 0; i < 10; i++ ){
ProducerRecord<String, String> message = new ProducerRecord<>( topic, "this is message " + i );
producer.send( message );
System.out.println("message sent.");
}
producer.close(); // don't forget this
}
}