一些理论知识
kafka的特点
1.数据吞吐量大,低延时,扩展性好
2.集群容错性高,允许少量节点崩溃
3.功能简单,主要关注消息传递,不支持死信队列,顺序消息等高级功能
4.允许少量数据丢失
kafka基础概念
Broker:Kafka集群包含一个或多个服务器,这种服务器被称为broker
Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。
物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据
Partition:Partition是物理上的概念,每个Topic包含一个或多个Partition.
Producer:生产者,负责发布消息到Kafka broker
Consumer:消费者,向Kafka broker读取消息的客户端。
zookeeper可以帮助kafka管理broker、consumer。kafka在创建Broker后,向zookeeper注册新的broker信息,实现在服务器正常运行下的水平拓展。
kafka使用
kafka的bin目录下有很多脚本可以直接使用,如topic.sh,customer.sh,provider.sh等,使用—help可以查看脚本用法
kafka单机部署
kafka是自带zookeeper的,但是集群环境中不用,一般都自己再额外部署一个zookeeper集群
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| 下载jdk-8u431-linux-x64.rpm rpm -ivh jdk-8u431-linux-x64.rpm java -version java version "1.8.0_431" Java(TM) SE Runtime Environment (build 1.8.0_431-b10) Java HotSpot(TM) 64-Bit Server VM (build 25.431-b10, mixed mode)
wget https://downloads.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz
wget https://dlcdn.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gz
tar -xf apache-zookeeper-3.8.4-bin.tar.gz mv apache-zookeeper-3.8.4-bin /usr/local/zookeeper tar -xf kafka_2.13-3.8.0.tgz mv kafka_2.13-3.8.0 /usr/local/kafka
nohup /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties & jps 3635 QuorumPeerMain 4174 Jps
nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties & jps 3635 QuorumPeerMain 4693 Jps 4270 Kafka
|
kafka与zookeeper集群部署
为了让kafka具有更好的性能、存储更多的消息、防止单点服务故障
kafka集群为每个partition配置一个或多个备份,确保数据不会丢失
kafka集群通过zookeeper集群作为选举中心,给每个partition选举出一个主节点,其他节点就是从节点。主节点负责相应客户端需求与保存信息,从节点负责同步主节点数据。当主节点故障时,kafka会选举出从节点作为新主节点
kafka的broker信息、partition的选举信息,都会保存在zookeeper集群中,这样kafka集群就不会因为某些broker崩溃而中断
环境说明
1 2 3 4 5 6 7 8 9 10
| VMware Workstation 17pro Centos7.9.2009 4C4G 20G精简置备 jdk 1.8.0_431 kafka 3.8.0 zookeeper 3.8.4 LTS
192.168.8.151 zookeeper1 192.168.8.152 zookeeper2 192.168.8.153 zookeeper3
|
环境初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| cp -a /etc/yum.repos.d /etc/yum.repos.d.backup rm -f /etc/yum.repos.d/* curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo yum clean all && yum makecache
sed -i 's/^SELINUX=.*/SELINUX=disabled/g' /etc/selinux/config setenforce 0 systemctl disable firewalld --now
yum -y install chrony sed -i 's/^server/#server/g' /etc/chrony.conf sed -i '1s/^/server cn.pool.ntp.org iburst\n/' /etc/chrony.conf systemctl restart chronyd
cat >/etc/hosts<<EOF 127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 192.168.8.151 zookeeper1 192.168.8.152 zookeeper2 192.168.8.153 zookeeper3 EOF
|
软件包安装
1 2 3 4 5 6 7 8
| 下载jdk-8u431-linux-x64.rpm rpm -ivh jdk-8u431-linux-x64.rpm wget https://downloads.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz wget https://dlcdn.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gz tar -xf apache-zookeeper-3.8.4-bin.tar.gz mv apache-zookeeper-3.8.4-bin /usr/local/zookeeper tar -xf kafka_2.13-3.8.0.tgz mv kafka_2.13-3.8.0 /usr/local/kafka
|
zookeeper集群配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| cd /usr/local/zookeeper/conf/ cp zoo_sample.cfg zoo.cfg cat >zoo.cfg<<EOF tickTime=2000 initLimit=10 syncLimit=5 dataDir=/usr/local/zookeeper/data clientPort=2181 server.1=192.168.8.151:2888:3888 # 2888集群传输端口 server.2=192.168.8.152:2888:3888 # 3888集群选举端口 server.3=192.168.8.153:2888:3888 EOF scp zoo.cfg zookeeper2:/usr/local/zookeeper/conf/zoo.cfg scp zoo.cfg zookeeper3:/usr/local/zookeeper/conf/zoo.cfg
mkdir /usr/local/zookeeper/data
echo 1 > /usr/local/zookeeper/data/myid
echo 2 > /usr/local/zookeeper/data/myid echo 3 > /usr/local/zookeeper/data/myid
|
kafka集群配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| /usr/local/kafka/config/server.properties文件 cd /usr/local/kafka/config/ cp server.properties server.properties.bak cat > /usr/local/kafka/config/server.properties<<EOF # brokerID,每台都要不同 broker.id=1 # 服务监听地址 listeners=PLAINTEXT://zookeeper1:9092 # 日志位置 log.dirs=/usr/local/kafka/logs # 默认的topic分区数 num.partitions=1 # zookeeper连接地址 zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181 EOF
|
启动zookeeper与kafka集群
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| /usr/local/zookeeper/bin/zkServer.sh --config /usr/local/zookeeper/conf start
ss -tunlp | grep 888 tcp LISTEN 0 50 [::ffff:192.168.8.151]:3888 [::]:* users:(("java",pid=60408,fd=66))
jps 60466 Jps 60408 QuorumPeerMain
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
jps 57911 Kafka 56680 QuorumPeerMain 57994 Jps
ss -tunlp | grep 9092 tcp LISTEN 0 50 [::ffff:192.168.8.153]:9092 [::]:* users:(("java",pid=57911,fd=172))
|
测试——创建topic
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| cd /usr/local/kafka/bin/
./kafka-topics.sh --bootstrap-server zookeeper2:9092 --create --replication-factor 2 --partitions 4 --topic test --create 创建新主题 --partitions 4 主题要分成4个分区 --topic 主题名称 --bootstrap-server 参数指定了Kafka集群的bootstrap服务器地址
./kafka-topics.sh --bootstrap-server zookeeper2:9092 --list test
./kafka-topics.sh --bootstrap-server zookeeper2:9092 --describe --topic test [2024-10-16 08:41:29,276] WARN [AdminClient clientId=adminclient-1] The DescribeTopicPartitions API is not supported, using Metadata API to describe topics. (org.apache.kafka.clients.admin.KafkaAdminClient) Topic: test TopicId: mPuRFZcjSoGIBDz3U3eaQA PartitionCount: 4 ReplicationFactor: 2 Configs: Topic: test Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2 Elr: N/A LastKnownElr: N/A Topic: test Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Elr: N/A LastKnownElr: N/A Topic: test Partition: 2 Leader: 3 Replicas: 3,1 Isr: 3,1 Elr: N/A LastKnownElr: N/A Topic: test Partition: 3 Leader: 1 Replicas: 1,3 Isr: 1,3 Elr: N/A LastKnownElr: N/A
|