Apache Kafka 介绍

Apache Kafka 是一个分布式流媒体平台。

意思是

1-It 允许你发布和订阅记录流。在这方面,它类似于消​​息队列或企业消息传递系统。

2-It 允许你以容错方式存储记录流。

3-It 允许你在记录发生时处理它们。

它被用于两大类应用:

1 - 构建实时流数据管道,可在系统或应用程序之间可靠地获取数据

2 - 构建转换或响应数据流的实时流应用程序

Kafka 控制台脚本对于基于 Unix 和 Windows 的平台是不同的。在示例中,你可能需要根据平台添加扩展。Linux:脚本位于 bin/,扩展名为 .sh。Windows:脚本位于 bin\windows\并具有 .bat 扩展名。

安装

第 1 步: 下载代码并解压缩:

tar -xzf kafka_2.11-0.10.1.0.tgz
cd kafka_2.11-0.10.1.0

第 2 步: 启动服务器。

以后可以删除主题,打开 server.properties 并将 delete.topic.enable 设置为 true。

Kafka 在很大程度上依赖于 zookeeper,所以你需要先启动它。如果你没有安装它,你可以使用与 kafka 一起打包的便捷脚本来获得一个快速且脏的单节点 ZooKeeper 实例。

zookeeper-server-start config/zookeeper.properties
kafka-server-start config/server.properties

第 3 步: 确保一切正常运行

你现在应该让 zookeeper 听 localhost:2181localhost:6667 上的一个 kafka 经纪人。

创建一个主题

我们只有一个代理,因此我们创建一个没有复制因子且只有一个分区的主题:

kafka-topics --zookeeper localhost:2181 \
    --create \
    --replication-factor 1 \
    --partitions 1 \
    --topic test-topic

检查你的主题:

kafka-topics --zookeeper localhost:2181 --list
test-topic

kafka-topics --zookeeper localhost:2181 --describe --topic test-topic
Topic:test-topic    PartitionCount:1    ReplicationFactor:1 Configs:
Topic: test-topic   Partition: 0    Leader: 0   Replicas: 0 Isr: 0

发送和接收消息

启动消费者:

kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic

在另一个终端上,启动生产者并发送一些消息。默认情况下,该工具将每行作为单独的消息发送给代理,而无需特殊编码。写一些行并用 CTRL + D 或 CTRL + C 退出:

kafka-console-producer --broker-list localhost:9092 --topic test-topic   
a message
another message
^D

消息应出现在消费者的终端中。

停止卡夫卡

kafka-server-stop 

启动多代理群集

以上示例仅使用一个代理。要设置真正的集群,我们只需要启动多个 kafka 服务器。他们会自动协调自己。

步骤 1: 为避免冲突,我们为每个代理创建一个 server.properties 文件,并更改 idportlogfile 配置属性。

复制:

cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

编辑每个文件的属性,例如:

vim config/server-1.properties
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/usr/local/var/lib/kafka-logs-1

vim config/server-2.properties
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/usr/local/var/lib/kafka-logs-2

第 2 步: 启动三个经纪人:

    kafka-server-start config/server.properties &
    kafka-server-start config/server-1.properties &
    kafka-server-start config/server-2.properties &

创建复制主题

kafka-topics --zookeeper localhost:2181 --create --replication-factor 3 --partitions 1 --topic replicated-topic

kafka-topics --zookeeper localhost:2181 --describe --topic replicated-topic
Topic:replicated-topic  PartitionCount:1    ReplicationFactor:3 Configs:
Topic: replicated-topic Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0

这一次,有更多信息:

  • leader 是负责给定分区的所有读取和写入的节点。每个节点将成为随机选择的分区部分的领导者。
  • replicas 是复制此分区日志的节点列表,无论它们是否为领导者,或者即使它们当前处于活动状态。
  • isr同步复制品的集合。这是副本列表的子集,该列表当前处于活跃状态并且已经被领导者捕获。

请注意,先前创建的主题保持不变。

测试容错

将一些消息发布到新主题:

kafka-console-producer --broker-list localhost:9092 --topic replicated-topic
hello 1
hello 2
^C

杀死领导者(在我们的例子中为 1)。在 Linux 上:

ps aux | grep server-1.properties
kill -9 <PID>

在 Windows 上:

wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties" 
taskkill /pid <PID> /f

看看发生了什么:

kafka-topics --zookeeper localhost:2181  --describe --topic replicated-topic
Topic:replicated-topic  PartitionCount:1    ReplicationFactor:3 Configs:
Topic: replicated-topic Partition: 0    Leader: 2   Replicas: 1,2,0 Isr: 2,0

领导层已经转向经纪人 2 和 1 不再同步。但消息仍然存在(使用消费者自己检查)。

清理

删除这两个主题:

kafka-topics --zookeeper localhost:2181 --delete --topic test-topic
kafka-topics --zookeeper localhost:2181 --delete --topic replicated-topic