Apache Kafka 介紹
Apache Kafka 是一個分散式流媒體平臺。
意思是
1-It 允許你釋出和訂閱記錄流。在這方面,它類似於消息佇列或企業訊息傳遞系統。
2-It 允許你以容錯方式儲存記錄流。
3-It 允許你在記錄發生時處理它們。
它被用於兩大類應用:
1 - 構建實時流資料管道,可在系統或應用程式之間可靠地獲取資料
2 - 構建轉換或響應資料流的實時流應用程式
Kafka 控制檯指令碼對於基於 Unix 和 Windows 的平臺是不同的。在示例中,你可能需要根據平臺新增擴充套件。Linux:指令碼位於
bin/
,副檔名為.sh
。Windows:指令碼位於bin\windows\
並具有.bat
副檔名。
安裝
第 1 步: 下載程式碼並解壓縮:
tar -xzf kafka_2.11-0.10.1.0.tgz
cd kafka_2.11-0.10.1.0
第 2 步: 啟動伺服器。
以後可以刪除主題,開啟
server.properties
並將delete.topic.enable
設定為 true。
Kafka 在很大程度上依賴於 zookeeper,所以你需要先啟動它。如果你沒有安裝它,你可以使用與 kafka 一起打包的便捷指令碼來獲得一個快速且髒的單節點 ZooKeeper 例項。
zookeeper-server-start config/zookeeper.properties
kafka-server-start config/server.properties
第 3 步: 確保一切正常執行
你現在應該讓 zookeeper 聽 localhost:2181
和 localhost:6667
上的一個 kafka 經紀人。
建立一個主題
我們只有一個代理,因此我們建立一個沒有複製因子且只有一個分割槽的主題:
kafka-topics --zookeeper localhost:2181 \
--create \
--replication-factor 1 \
--partitions 1 \
--topic test-topic
檢查你的主題:
kafka-topics --zookeeper localhost:2181 --list
test-topic
kafka-topics --zookeeper localhost:2181 --describe --topic test-topic
Topic:test-topic PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
傳送和接收訊息
啟動消費者:
kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic
在另一個終端上,啟動生產者併傳送一些訊息。預設情況下,該工具將每行作為單獨的訊息傳送給代理,而無需特殊編碼。寫一些行並用 CTRL + D 或 CTRL + C 退出:
kafka-console-producer --broker-list localhost:9092 --topic test-topic
a message
another message
^D
訊息應出現在消費者的終端中。
停止卡夫卡
kafka-server-stop
啟動多代理群集
以上示例僅使用一個代理。要設定真正的叢集,我們只需要啟動多個 kafka 伺服器。他們會自動協調自己。
步驟 1: 為避免衝突,我們為每個代理建立一個 server.properties
檔案,並更改 id
,port
和 logfile
配置屬性。
複製:
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
編輯每個檔案的屬性,例如:
vim config/server-1.properties
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/usr/local/var/lib/kafka-logs-1
vim config/server-2.properties
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/usr/local/var/lib/kafka-logs-2
第 2 步: 啟動三個經紀人:
kafka-server-start config/server.properties &
kafka-server-start config/server-1.properties &
kafka-server-start config/server-2.properties &
建立複製主題
kafka-topics --zookeeper localhost:2181 --create --replication-factor 3 --partitions 1 --topic replicated-topic
kafka-topics --zookeeper localhost:2181 --describe --topic replicated-topic
Topic:replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
這一次,有更多資訊:
leader
是負責給定分割槽的所有讀取和寫入的節點。每個節點將成為隨機選擇的分割槽部分的領導者。replicas
是複製此分割槽日誌的節點列表,無論它們是否為領導者,或者即使它們當前處於活動狀態。isr
是同步複製品的集合。這是副本列表的子集,該列表當前處於活躍狀態並且已經被領導者捕獲。
請注意,先前建立的主題保持不變。
測試容錯
將一些訊息釋出到新主題:
kafka-console-producer --broker-list localhost:9092 --topic replicated-topic
hello 1
hello 2
^C
殺死領導者(在我們的例子中為 1)。在 Linux 上:
ps aux | grep server-1.properties
kill -9 <PID>
在 Windows 上:
wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties"
taskkill /pid <PID> /f
看看發生了什麼:
kafka-topics --zookeeper localhost:2181 --describe --topic replicated-topic
Topic:replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
領導層已經轉向經紀人 2 和 1
不再同步。但訊息仍然存在(使用消費者自己檢查)。
清理
刪除這兩個主題:
kafka-topics --zookeeper localhost:2181 --delete --topic test-topic
kafka-topics --zookeeper localhost:2181 --delete --topic replicated-topic