Apache Kafka 介绍
Apache Kafka 是一个分布式流媒体平台。
意思是
1-It 允许你发布和订阅记录流。在这方面,它类似于消息队列或企业消息传递系统。
2-It 允许你以容错方式存储记录流。
3-It 允许你在记录发生时处理它们。
它被用于两大类应用:
1 - 构建实时流数据管道,可在系统或应用程序之间可靠地获取数据
2 - 构建转换或响应数据流的实时流应用程序
Kafka 控制台脚本对于基于 Unix 和 Windows 的平台是不同的。在示例中,你可能需要根据平台添加扩展。Linux:脚本位于
bin/
,扩展名为.sh
。Windows:脚本位于bin\windows\
并具有.bat
扩展名。
安装
第 1 步: 下载代码并解压缩:
tar -xzf kafka_2.11-0.10.1.0.tgz
cd kafka_2.11-0.10.1.0
第 2 步: 启动服务器。
以后可以删除主题,打开
server.properties
并将delete.topic.enable
设置为 true。
Kafka 在很大程度上依赖于 zookeeper,所以你需要先启动它。如果你没有安装它,你可以使用与 kafka 一起打包的便捷脚本来获得一个快速且脏的单节点 ZooKeeper 实例。
zookeeper-server-start config/zookeeper.properties
kafka-server-start config/server.properties
第 3 步: 确保一切正常运行
你现在应该让 zookeeper 听 localhost:2181
和 localhost:6667
上的一个 kafka 经纪人。
创建一个主题
我们只有一个代理,因此我们创建一个没有复制因子且只有一个分区的主题:
kafka-topics --zookeeper localhost:2181 \
--create \
--replication-factor 1 \
--partitions 1 \
--topic test-topic
检查你的主题:
kafka-topics --zookeeper localhost:2181 --list
test-topic
kafka-topics --zookeeper localhost:2181 --describe --topic test-topic
Topic:test-topic PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
发送和接收消息
启动消费者:
kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic
在另一个终端上,启动生产者并发送一些消息。默认情况下,该工具将每行作为单独的消息发送给代理,而无需特殊编码。写一些行并用 CTRL + D 或 CTRL + C 退出:
kafka-console-producer --broker-list localhost:9092 --topic test-topic
a message
another message
^D
消息应出现在消费者的终端中。
停止卡夫卡
kafka-server-stop
启动多代理群集
以上示例仅使用一个代理。要设置真正的集群,我们只需要启动多个 kafka 服务器。他们会自动协调自己。
步骤 1: 为避免冲突,我们为每个代理创建一个 server.properties
文件,并更改 id
,port
和 logfile
配置属性。
复制:
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
编辑每个文件的属性,例如:
vim config/server-1.properties
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/usr/local/var/lib/kafka-logs-1
vim config/server-2.properties
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/usr/local/var/lib/kafka-logs-2
第 2 步: 启动三个经纪人:
kafka-server-start config/server.properties &
kafka-server-start config/server-1.properties &
kafka-server-start config/server-2.properties &
创建复制主题
kafka-topics --zookeeper localhost:2181 --create --replication-factor 3 --partitions 1 --topic replicated-topic
kafka-topics --zookeeper localhost:2181 --describe --topic replicated-topic
Topic:replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
这一次,有更多信息:
leader
是负责给定分区的所有读取和写入的节点。每个节点将成为随机选择的分区部分的领导者。replicas
是复制此分区日志的节点列表,无论它们是否为领导者,或者即使它们当前处于活动状态。isr
是同步复制品的集合。这是副本列表的子集,该列表当前处于活跃状态并且已经被领导者捕获。
请注意,先前创建的主题保持不变。
测试容错
将一些消息发布到新主题:
kafka-console-producer --broker-list localhost:9092 --topic replicated-topic
hello 1
hello 2
^C
杀死领导者(在我们的例子中为 1)。在 Linux 上:
ps aux | grep server-1.properties
kill -9 <PID>
在 Windows 上:
wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties"
taskkill /pid <PID> /f
看看发生了什么:
kafka-topics --zookeeper localhost:2181 --describe --topic replicated-topic
Topic:replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
领导层已经转向经纪人 2 和 1
不再同步。但消息仍然存在(使用消费者自己检查)。
清理
删除这两个主题:
kafka-topics --zookeeper localhost:2181 --delete --topic test-topic
kafka-topics --zookeeper localhost:2181 --delete --topic replicated-topic