Akemi

EFK日志处理平台-2

2024/06/02

总体思路:filebeat生产→kafka topic→logstash消费、格式转换→es→kibana

zookeeper简单介绍

zookeeper就是动物园管理员,他是用来管hadoop(大象)、Hive(蜜蜂)、pig(小
猪)、kafka消息系统的管理员, Apache Hbase 和 Apache Solr 的分布式集群都用到了 zookeeper;Zookeeper是一个分布式的、开源的程序协调服务,是 hadoop 项目下的一个子项目

Zookeeper主要作用在于:

1.节点选举

主节点挂了之后,从节点就会接手工作 ,并且,保证这个节点是唯一的,从而保证集群的高可用

2.统一配置文件管理

只需要部署一台服务器

则可以把相同的配置文件,同步更新到其他所有服务器,比如,修改了Hadoop,Kafka,redis统一配置等

3.发布与订阅消息

类似于消息队列,发布者把数据存在znode节点上,订阅者会读取这个数据

4.集群管理

集群中保证数据的一致性

Zookeeper的选举机制===>过半机制

安装的台数===>奇数台(否则无法过半机制)

一般情况下10台服务器需安装ZK3台 (华为FusionStorage平台在OSD组件仲裁的时候用的也是ZK)

20台======>5台

50台=======>7台

100台=======>11台

zookeeper角色:
leader领导者
follower跟随着
observer观察者
leader:负责发起选举和决议的,更新系统状态
follower:接收客户端的请求,给客户端返回结果,在选主的过程参与投票
observe:接收客户端的连接,同步leader状态,不参与选主

部署zookeeper集群

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
环境:
centos7.9
8C8G 50Gdisk
192.168.10.181 zoo1
192.168.10.182 zoo2
192.168.10.183 zoo3
filebeats版本7.13.1
kafka版本3.1.0
zookeeper版本3.8
logstash版本7.9.2

# 网络与主机名配置
hostnamectl set-hostname zoo1 && bash
nmcli con modify ens18 ipv4.addresses 192.168.10.181/24 ipv4.dns 192.168.1.1 ipv4.gateway 192.168.10.1 ipv4.method manual
nmcli con up ens18

hostnamectl set-hostname zoo2 && bash
nmcli con modify ens18 ipv4.addresses 192.168.10.182/24 ipv4.dns 192.168.1.1 ipv4.gateway 192.168.10.1 ipv4.method manual
nmcli con up ens18

hostnamectl set-hostname zoo3 && bash
nmcli con modify ens18 ipv4.addresses 192.168.10.183/24 ipv4.dns 192.168.1.1 ipv4.gateway 192.168.10.1 ipv4.method manual
nmcli con up ens18

# host设置
cat /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.10.181 zoo1
192.168.10.182 zoo2
192.168.10.183 zoo3
scp /etc/hosts zoo1:/etc/hosts
scp /etc/hosts zoo2:/etc/hosts
scp /etc/hosts zoo3:/etc/hosts

# 安全
sed -i 's/SELINUX=enforcing/SELINUX=disabled/g' /etc/selinux/config
setenforce 0
systemctl disable firewalld --now

# 时钟同步
yum -y install chrony
sed -i 's/^server/#server/g' /etc/chrony.conf
sed -i '1s/^/server cn.pool.ntp.org iburst\n/' /etc/chrony.conf
systemctl restart chronyd

# 上传压缩包并解压缩
tar zxvf apache-zookeeper-3.8.0-bin.tar.gz -C /opt/

# 创建数据目录与日志目录
mv /opt/apache-zookeeper-3.8.0-bin/ /opt/zookeeper
mkdir /opt/zookeeper/zkData
mkdir /opt/zookeeper/zkLog

# 修改配置文件
指定数据目录与日志目录,指定另外几个节点的IP:
cd /opt/zookeeper/conf/
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
传送到另外两个节点:
scp zoo.cfg zoo1:/opt/zookeeper/conf/
scp zoo.cfg zoo2:/opt/zookeeper/conf/
scp zoo.cfg zoo3:/opt/zookeeper/conf/

# 准备启动zookeeper
安装java
yum -y install java

设置三台的myid
zoo1:
echo 1 > /opt/zookeeper/myid
zoo2:
echo 2 > /opt/zookeeper/myid
zoo3:
echo 3 > /opt/zookeeper/myid

123依次启动
cd /opt/zookeeper/bin
nohup ./zkServer.sh start ../conf/zoo.cfg &

测试
cd /opt/zookeeper/bin/
./zkCli.sh -server 127.0.0.1:2181
注意必须关闭安全设置和时钟同步,我一开始没有设置直接一堆报错

部署kafka集群

kafka基础概念

Broker:Kafka集群包含一个或多个服务器,这种服务器被称为broker

Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。
物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据

Partition:Partition是物理上的概念,每个Topic包含一个或多个Partition.

Producer:负责发布消息到Kafka broker

Consumer:消息消费者,向Kafka broker读取消息的客户端。

和zk的关系

zookeeper可以帮助kafka管理broker、consumer。kafka在创建Broker后,向zookeeper注册新的broker信息,实现在服务器正常运行下的水平拓展。

1
2
3
4
5
6
tar xf kafka_2.13-3.1.0.tgz
修改配置文件
vim kafka_2.13-3.1.0/config/server.properties
listeners=192.168.10.181://:9092 # 本机地址的kafka端口
zookeeper.connect=192.168.10.181:2181,192.168.10.182:2181:192.168.10.192:2181
# zoo集群的地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
启动kafka
cd kafka_2.13-3.1.0/bin
./kafka-server-start.sh -daemon ../config/server.properties

测试————登录zookeeper,查看信息broker/ids
/opt/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181
[zk: 127.0.0.1:2181(CONNECTED) 0] ls /brokers/ids
[0] # 能看到有一个broker的id

测试————生产者消费者模型
1.kafka里创建topic
cd kafka_2.13-3.1.0/bin
./kafka-topics.sh --create --topic quickstart-events --bootstrap-server 192.168.10.181:9092
# Created topic quickstart-events.

2.查看topic
./kafka-topics.sh --describe --topic quickstart-events --bootstrap-server 192.168.10.181:9092
#Topic: quickstart-events TopicId: mpbHVHMNQtaQEtPHDb7ZMA PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
# Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0

3.调用生产者console向topic中写入消息
./kafka-console-producer.sh --topic quickstart-events --bootstrap-server 192.168.10.181:9092
4.同时调用消费者读取消息
./kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server 192.168.10.181:9092

多节点做集群

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
将文件夹直接复制过去
scp -r kafka_2.13-3.1.0/ zoo2:~
scp -r kafka_2.13-3.1.0/ zoo3:~

修改server.properties配置文件,将其中的kafka监听IP改成本机ip
vim kafka_2.13-3.1.0/config/server.properties
listeners=PLAINTEXT://192.168.10.182:9092
listeners=PLAINTEXT://192.168.10.183:9092

修改server.properties配置文件,修改broker.id
不能都为0,因为broker.id是kafka物理服务器的标识
broker.id=1
broker.id=2

启动kafka
cd /root/kafka_2.13-3.1.0/bin
./kafka-server-start.sh -daemon ../config/server.properties

本机查看kafka是否正常运行
ps aux | grep kafka

登录zookeeper查看
/opt/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181
[zk: 127.0.0.1:2181(CONNECTED) 0] ls /brokers/ids
[0, 1, 2]

部署filebeat服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
1.安装一个nginx用来被采集日志,先单节点采集,后面再多节点
yum -y install nginx
systemctl enable nginx.service --now

2.kafka创建topic,存放日志数据,指定kafka主机IP/端口
cd kafka_2.13-3.1.0/bin/
./kafka-topics.sh --create --topic nginx-log-topic --bootstrap-server 192.168.10.181:9092,192.168.10.182:9092,192.168.10.183:9092
# Created topic nginx-log-topic.

查询topic
./kafka-topics.sh --describe --topic nginx-log-topic --bootstrap-server 192.168.10.181:9092
#Topic: nginx-log-topic TopicId: 7z1LPHcFSfKRQw-uT9EXgA PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
#Topic: nginx-log-topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1

3.为nginx主机下载解压filebeat,版本7.13.1
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.13.1-linux-x86_64.tar.gz
tar xf filebeat-7.13.1-linux-x86_64.tar.gz

4.开启filebeat的nginx模块
./filebeat modules enable nginx
# Enabled nginx

5.写一个filebeats_nginx.yaml配置文件
vim filebeats_nginx.yaml
# 指定filebeat模块配置
filebeat.modules:
# 这是一个针对nginx的模块配置
- module: nginx
# 访问日志配置
access:
# 启用访问日志收集
enabled: true
# 指定访问日志的路径
var.paths: ["/var/log/nginx/access.log*"]
# 错误日志配置
error:
# 启用错误日志收集
enabled: true
# 指定错误日志的路径,支持通配符
var.paths: ["/var/log/nginx/error.log*"]

#----------------------------------Kafka 输出配置--------------------------------#
# output到kafka的配置文件
output.kafka:
# 启用Kafka输出
enabled: true
# Kafka集群的主机列表和端口
hosts: ['192.168.10.181:9092', '192.168.10.182:9092','192.168.10.183:9092']
# Kafka的topic名称
topic: 'nginx-log-topic'
# acks值默认为1(leader写入即确认)
required_acks: 1
# 压缩格式,默认为gzip
compression: gzip
# 发送消息的最大字节数,默认为1000000字节
max_message_bytes: 1000000
# 编码格式配置,直接输出message字段内容
codec.format:
string: '%{[message]}'

6.启动filebeat
nohup ./filebeat -e -c filebeats_nginx.yaml

7.测试
尝试消费一下————查看kafka的topic是否有日志
cd ~/kafka_2.13-3.1.0/bin/
./kafka-console-consumer.sh --topic nginx-log-topic --from-beginning --bootstrap-server 192.168.10.181:9092,192.168.10.182:9092,192.168.10.183:9092
登录一下nginx界面

能看到有数据,说明topic已经成功通过filebeat作为生产者存入数据了

部署logstash

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
logstash版本7.9.2
1.下载与解压
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.9.2.tar.gz
tar xf logstash-7.9.2.tar.gz
cd logstash-7.9.2/

2.创建一个配置文件nginx.conf
配置input模块,设置从kafka消费
cat > config/nginx.conf << EOF
input{
kafka {
bootstrap_servers => ["192.168.10.181:9092,192.168.10.182:9092 ,192.168.10.183:9092"]
auto_offset_reset => "latest"
consumer_threads => 3
decorate_events => true
topics => ["nginx-log-topic"]
codec => "json"
}
}
output {
elasticsearch {
hosts => ["192.168.10.181:9200"]
index => "kafkalog-%{+YYYY.MM.dd}"
}
}
EOF

注释部分:
# Logstash配置文件注释

# 输入配置部分 (input)
# 该部分定义了Logstash从何处接收数据。在此例中,Logstash将从Kafka集群中接收数据。

# Kafka配置
input {
kafka {
# bootstrap_servers:Kafka集群的地址列表,Logstash将连接到这些地址以获取数据。
# 确保列出了所有Kafka broker的地址和端口。
bootstrap_servers => ["192.168.10.181:9092,192.168.10.182:9092,192.168.10.183:9092"]

# auto_offset_reset:当没有初始偏移量或当前偏移量在日志中不存在时,如何重置偏移量。
# "latest" 表示从最新的消息开始消费。
auto_offset_reset => "latest"

# consumer_threads:消费者线程的数量。这将决定Logstash从Kafka并行读取数据的线程数。
consumer_threads => 3

# decorate_events:如果设置为true,将为每个事件添加一个包含Kafka元数据的字段。
decorate_events => true

# topics:要订阅的Kafka主题列表。
topics => ["nginx-log-topic"]

# codec:用于解码从Kafka接收的数据的编解码器。这里使用JSON编解码器。
codec => "json"
}
}

# 输出配置部分 (output)
# 该部分定义了Logstash将数据发送到哪里。在此例中,Logstash将数据发送到Elasticsearch集群。

# Elasticsearch配置
output {
elasticsearch {
# hosts:Elasticsearch集群的地址列表。Logstash将发送数据到这些地址。
hosts => ["192.168.10.121:9200"]

# index:用于存储数据的Elasticsearch索引名称。这里使用日期模板来动态生成索引名称。
index => "kafkalog-%{+YYYY.MM.dd}"
}
}

3.启动logstash
cd /root/logstash-7.9.2/bin
nohup ./logstash -f ../config/nginx.conf >> logstash.log &

此时会报错,因为咱还没有安装es,自然9200端口是不通的

部署es与kibana服务

elasticsearch是一个实时的,分布式的,可扩展的搜索引擎,它允许进行全文本和结构化搜索以及对日志进行分析。它通常用于索引和搜索大量日志数据,也可以用于搜索许多不同种类的文档。elasticsearch具有三大功能:搜索、分析、存储数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
采用容器化的方式使用es,版本7.9.2

1.创建一个目录用来存es的数据
mkdir -p ~/es_data
chmod 777 ~/es_data

2.安装与配置docker
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
yum -y install docker-ce
cat > /etc/docker/daemon.json << EOF
{
"registry-mirrors": [
"https://hub-mirror.c.163.com",
"https://docker.m.daocloud.io",
"https://ghcr.io",
"https://mirror.baidubce.com",
"https://docker.nju.edu.cn"]
}
EOF
systemctl daemon-reload
systemctl enable docker --now
systemctl restart docker

3.容器化部署es7.9.2
docker run -p 9200:9200 -p 9330:9300 -itd -e "discovery.type=single-node" --name es -v /es_data:/sr/share/elasticsearch/data docker.io/library/elasticsearch:7.9.2
docker ps
#CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
#45ad1655a212 elasticsearch:7.9.2 "/tini -- /usr/local…" 6 seconds ago Up 5 seconds 0.0.0.0:9200->9200/tcp, :::9200->9200/tcp, 0.0.0.0:9330->9300/tcp, :::9330->9300/tcp es

netstat -tunlp | grep 9200
tcp 0 0 0.0.0.0:9200 0.0.0.0:* LISTEN 18151/docker-proxy

4.容器化部署kibana7.9.2
docker run -p 5601:5601 -it -d --link es -e ELASTICSEARCH_URL=http://192.168.10.181:9200 --name kibana kibana:7.9.2
docker ps | grep kibana
# a60c6020bad3 kibana:7.9.2 "/usr/local/bin/dumb…" About a minute ago Up About a minute 0.0.0.0:5601->5601/tcp, :::5601->5601/tcp kibana
ss -tunlp | grep 5601
#tcp LISTEN 0 128 *:5601 *:* users:(("docker-proxy",pid=18551,fd=4))

5.修改kibana配置文件kibana.yml
docker exec -it kibana /bin/bash
vi config/kibana.yml
修改es地址为实际地址
exit

docker restart kibana

1
2
3
6.登录kibana界面
端口为5601
创建匹配刚刚创建模式的索引

这里忘记给容器做时间同步了,所以时间有点早。
按照这套操作,filebeat在每个节点部署、es也可以做集群

原文作者:王盛

原文链接:https://akemi.zj.cn/2024/06/02/EFK/

发表日期:June 2nd 2024, 8:12:57 pm

更新日期:February 20th 2025, 6:37:27 pm

版权声明:本文采用知识共享署名-非商业性使用 4.0 国际许可协议进行许可

CATALOG
  1. 1. zookeeper简单介绍
  2. 2. 部署zookeeper集群
  3. 3. 部署kafka集群
    1. 3.1. 多节点做集群
  4. 4. 部署filebeat服务
  5. 5. 部署logstash
  6. 6. 部署es与kibana服务