SimpleProducer(kafka 0.9)

配置和初始化

首先,创建一个 maven 项目并在你的 pom 中添加以下依赖项:

 <dependencies>
     <dependency>
         <groupId>org.apache.kafka</groupId>
         <artifactId>kafka-clients</artifactId>
         <version>0.9.0.1</version>
     </dependency>
 </dependencies>

使用 Properties 对象初始化生成器。有许多属性允许你微调生产者行为。以下是所需的最小配置:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "simple-producer-XX");

bootstrap-servers 是一个或多个代理的初始列表,供生产者能够发现群集的其余部分。serializer 属性告诉 Kafka 如何编码消息键和值。在这里,我们将发送字符串消息。虽然不是必需的,但始终建议设置 client.id:这使你可以轻松地将代理上的请求与创建它的客户端实例相关联。

其他有趣的属性是:

props.put("acks", "all"); 
props.put("retries", 0);  
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);

你可以通过 acks 设置控制写入 Kafka 的消息持久性。默认值 1 需要分区负责人明确确认写入成功。Kafka 提供的最强保证是 acks=all,它保证分区领导者不仅接受写入,而且成功地复制到所有同步副本。你还可以使用值 0 来最大化吞吐量,但是你无法保证消息已成功写入代理的日志,因为在这种情况下代理甚至不发送响应。

retries(默认为> 0)确定生产者是否在失败后尝试重新发送消息。请注意,如果重试次数> 0,则可能会发生消息重新排序,因为在后续写入成功后可能会发生重试。

Kafka 生产商尝试将已发送的消息分批收集以提高吞吐量。使用 Java 客户端,你可以使用 batch.size 来控制每个消息批处理的最大大小(以字节为单位)。为了给批量填充更多时间,你可以使用 linger.ms 让生产者延迟发送。最后,可以使用 compression.type 设置启用压缩。

使用 buffer.memory 限制 Java 客户端可用于收集未发送消息的总内存。当达到此限制时,生产者将在提出异常之前阻止其他发送,直到 max.block.ms。此外,为避免将记录无限期排队,你可以使用 request.timeout.ms 设置超时。

完整的属性列表可在此处获得 。我建议你从 Confluent 阅读这篇文章了解更多详情。

发送消息

send() 方法是异步的。调用时,它会将记录添加到待处理记录发送的缓冲区中并立即返回。这允许生产者将各个记录一起批处理以提高效率。

发送的结果是 RecordMetadata,指定记录发送到的分区以及分配的偏移量。由于发送调用是异步的,因此它将为将分配给此记录的 RecordMetadata 返回 Future。要查阅元数据,你可以调用 get(),它将阻止直到请求完成或使用回调。

// synchronous call with get()
RecordMetadata recordMetadata = producer.send( message ).get();
// callback with a lambda
producer.send( message, ( recordMetadata, error ) -> System.out.println(recordMetadata) );

代码

public class SimpleProducer{
    
    public static void main( String[] args ) throws ExecutionException, InterruptedException{
        Properties props = new Properties();

        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put( "client.id", "octopus" );

        String topic = "test-topic";

        Producer<String, String> producer = new KafkaProducer<>( props );

        for( int i = 0; i < 10; i++ ){
            ProducerRecord<String, String> message = new ProducerRecord<>( topic, "this is message " + i );
            producer.send( message );
            System.out.println("message sent.");
        }

        producer.close(); // don't forget this
    }
}