如何提交抵消
KafkaConsumers 可以在后台自动提交偏移量(配置参数 enable.auto.commit = true
),默认设置是什么。那些自动提交是在 poll()
内完成的( 通常在循环中调用 )。可以通过 auto.commit.interval.ms
配置应该提交的偏移频率。因为,自动提交嵌入在 poll()
中,poll()
由用户代码调用,所以此参数定义了提交间隔的下限。
作为自动提交的替代方法,也可以手动管理偏移。为此,应禁用自动提交(enable.auto.commit = false
)。对于手动提交,KafkaConsumers
提供了两种方法,即 commitSync()
和 commitAsync()
。如名称所示,commitSync()
是一个阻塞调用,在成功提交偏移后返回,而 commitAsync()
立即返回。如果你想知道提交是否成功,你可以提供一个回调处理程序(OffsetCommitCallback
)方法参数。请注意,在两次提交调用中,消费者都会提交最新的 poll()
调用的偏移量。例如。让我们假设一个分区主题与一个消费者,最后一次调用 poll()
返回消息 4,5,6。在提交时,将提交偏移量 6,因为这是消费者客户端跟踪的最新偏移量。同时,commitSync()
和 commitAsync()
都允许更多控制你想要提交的偏移量:如果使用允许你指定 Map<TopicPartition, OffsetAndMetadata>
的相应重载,则消费者将只提交指定的偏移量(即,
已提交偏移的语义
提交的偏移量表示已经处理了直到此偏移量的所有消息。因此,由于偏移量是连续数,因此提交偏移量 X
会隐式提交小于 X
的所有偏移量。因此,没有必要单独提交每个偏移量,并且一次提交多个偏移量,但只是提交最大的偏移量。
注意,通过设计,也可以提交比最后提交的偏移更小的偏移量。如果应该第二次读取消息,则可以这样做。
加工保证
使用自动提交提供至少一次处理语义。基本假设是,poll()
仅在所有先前传递的消息成功处理后才被调用。这确保了在处理之后发生提交时不会丢失任何消息。如果消费者在提交之前失败,则从 Kafka 接收上次提交之后的所有消息并再次处理。但是,此重试可能会导致重复,因为最后一次 poll()
调用的某些消息可能已被处理但故障发生在自动提交调用之前。
如果需要最多一次处理语义,则必须禁用自动提交,并且应该在 poll()
之后直接执行手动 commitSync()
。之后,消息得到处理。这确保了消息在处理之前提交,因此从不再次读取。当然,如果失败,某些消息可能会丢失。