如何提交抵消

KafkaConsumers 可以在後臺自動提交偏移量(配置引數 enable.auto.commit = true),預設設定是什麼。那些自動提交是在 poll() 內完成的( 通常在迴圈中呼叫 )。可以通過 auto.commit.interval.ms 配置應該提交的偏移頻率。因為,自動提交嵌入在 poll() 中,poll() 由使用者程式碼呼叫,所以此引數定義了提交間隔的下限。

作為自動提交的替代方法,也可以手動管理偏移。為此,應禁用自動提交(enable.auto.commit = false)。對於手動提交,KafkaConsumers 提供了兩種方法,即 commitSync()commitAsync() 。如名稱所示,commitSync() 是一個阻塞呼叫,在成功提交偏移後返回,而 commitAsync() 立即返回。如果你想知道提交是否成功,你可以提供一個回撥處理程式(OffsetCommitCallback)方法引數。請注意,在兩次提交呼叫中,消費者都會提交最新的 poll() 呼叫的偏移量。例如。讓我們假設一個分割槽主題與一個消費者,最後一次呼叫 poll() 返回訊息 4,5,6。在提交時,將提交偏移量 6,因為這是消費者客戶端跟蹤的最新偏移量。同時,commitSync()commitAsync() 都允許更多控制你想要提交的偏移量:如果使用允許你指定 Map<TopicPartition, OffsetAndMetadata> 的相應過載,則消費者將只提交指定的偏移量(即,

已提交偏移的語義

提交的偏移量表示已經處理了直到此偏移量的所有訊息。因此,由於偏移量是連續數,因此提交偏移量 X 會隱式提交小於 X 的所有偏移量。因此,沒有必要單獨提交每個偏移量,並且一次提交多個偏移量,但只是提交最大的偏移量。

注意,通過設計,也可以提交比最後提交的偏移更小的偏移量。如果應該第二次讀取訊息,則可以這樣做。

加工保證

使用自動提交提供至少一次處理語義。基本假設是,poll() 僅在所有先前傳遞的訊息成功處理後才被呼叫。這確保了處理之後發生提交時不會丟失任何訊息。如果消費者在提交之前失敗,則從 Kafka 接收上次提交之後的所有訊息並再次處理。但是,此重試可能會導致重複,因為最後一次 poll() 呼叫的某些訊息可能已被處理但故障發生在自動提交呼叫之前。

如果需要最多一次處理語義,則必須禁用自動提交,並且應該在 poll() 之後直接執行手動 commitSync()。之後,訊息得到處理。這確保了訊息在處理之前提交,因此從不再次讀取。當然,如果失敗,某些訊息可能會丟失。