SimpleProducer(kafka 0.9)
配置和初始化
首先,创建一个 maven 项目并在你的 pom 中添加以下依赖项:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
</dependencies>
使用 Properties
对象初始化生成器。有许多属性允许你微调生产者行为。以下是所需的最小配置:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "simple-producer-XX");
bootstrap-servers
是一个或多个代理的初始列表,供生产者能够发现群集的其余部分。serializer
属性告诉 Kafka 如何编码消息键和值。在这里,我们将发送字符串消息。虽然不是必需的,但始终建议设置 client.id
:这使你可以轻松地将代理上的请求与创建它的客户端实例相关联。
其他有趣的属性是:
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
你可以通过 acks
设置控制写入 Kafka 的消息的持久性。默认值 1
需要分区负责人明确确认写入成功。Kafka 提供的最强保证是 acks=all
,它保证分区领导者不仅接受写入,而且成功地复制到所有同步副本。你还可以使用值 0
来最大化吞吐量,但是你无法保证消息已成功写入代理的日志,因为在这种情况下代理甚至不发送响应。
retries
(默认为> 0)确定生产者是否在失败后尝试重新发送消息。请注意,如果重试次数> 0,则可能会发生消息重新排序,因为在后续写入成功后可能会发生重试。
Kafka 生产商尝试将已发送的消息分批收集以提高吞吐量。使用 Java 客户端,你可以使用 batch.size
来控制每个消息批处理的最大大小(以字节为单位)。为了给批量填充更多时间,你可以使用 linger.ms
让生产者延迟发送。最后,可以使用 compression.type
设置启用压缩。
使用 buffer.memory
限制 Java 客户端可用于收集未发送消息的总内存。当达到此限制时,生产者将在提出异常之前阻止其他发送,直到 max.block.ms
。此外,为避免将记录无限期排队,你可以使用 request.timeout.ms
设置超时。
完整的属性列表可在此处获得 。我建议你从 Confluent 阅读这篇文章了解更多详情。
发送消息
send()
方法是异步的。调用时,它会将记录添加到待处理记录发送的缓冲区中并立即返回。这允许生产者将各个记录一起批处理以提高效率。
发送的结果是 RecordMetadata
,指定记录发送到的分区以及分配的偏移量。由于发送调用是异步的,因此它将为将分配给此记录的 RecordMetadata 返回 Future
。要查阅元数据,你可以调用 get()
,它将阻止直到请求完成或使用回调。
// synchronous call with get()
RecordMetadata recordMetadata = producer.send( message ).get();
// callback with a lambda
producer.send( message, ( recordMetadata, error ) -> System.out.println(recordMetadata) );
代码
public class SimpleProducer{
public static void main( String[] args ) throws ExecutionException, InterruptedException{
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put( "client.id", "octopus" );
String topic = "test-topic";
Producer<String, String> producer = new KafkaProducer<>( props );
for( int i = 0; i < 10; i++ ){
ProducerRecord<String, String> message = new ProducerRecord<>( topic, "this is message " + i );
producer.send( message );
System.out.println("message sent.");
}
producer.close(); // don't forget this
}
}