Akemi

EFK日志处理平台

2024/05/17

如果出现无法载图的情况,请检查与github的连通性

日志处理概述

日志的处理有四个过程:采集→处理→存储→展示

ELK

E - Elasticsearch(简称:ES)

L - Logstash——比较重,转日志的能力很强

K - Kibana

Elasticsearch:日志存储和搜索引擎,它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。

Logstash:是一个完全开源的工具,他可以对你的日志进行收集、过滤,并将其存储供以后使用(支持动态的从各种数据源搜集数据,并对数据进行过滤、分析、丰富、统一格式等操作。)。

Kibana 也是一个开源和免费的工具,Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助您汇总、分析和搜索重要数据日志。

EFK

因为logstash的占用资源较大,将ELK中的logstash替代为fluentd的替代方案

Fluentd是一个流行的开源数据收集器,我们在 Kubernetes 集群节点上安装 Fluentd,通过获取容器日志文件、过滤和转换日志数据,然后将数据传递到 Elasticsearch 集群,在该集群中对其进行索引和存储。

相关常用的方案:
Logstash(采集、处理)—> ElasticSearch (存储)—>Kibana (展示)
Logstash(采集)—> Logstash(聚合、处理)—> ElasticSearch (存储)—>Kibana (展示)
Filebeat(采集、处理)—> ElasticSearch (存储)—>Kibana (展示)
Filebeat(采集)—> Logstash(聚合、处理)—> ElasticSearch (存储)—>Kibana (展示)
Filebeat(采集)—> Kafka/Redis(消峰) —> Logstash(聚合、处理)—> ElasticSearch (存储)—>Kibana (展示)

日志组件概述

es组件

Elasticsearch 是一个分布式的免费开源搜索和分析引擎,适用于包括文本、数字、地理空间、结构化和非结构化数据等在内的所有类型的数据。Elasticsearch 在 Apache Lucene 的基础上开发而成,由Elasticsearch N.V.(即现在的 Elastic)于 2010 年首次发布。Elasticsearch 以其简单的 REST 风格 API、分布式特性、速度和可扩展性而闻名,是 Elastic Stack 的核心组件

人们通常将 Elastic Stack 称为 ELK Stack(代指 Elasticsearch、Logstash 和 Kibana),目前 Elastic Stack 包括一系列丰富的轻量型数据采集代理,这些代理统称为 Beats,可用来向 Elasticsearch 发送数据。

beats组件

  Beats是一个轻量级日志采集器,Beats家族有6个成员,早期的ELK架构中使用Logstash收集、解析日志,但是Logstash对内存、cpu、io等资源消耗比较高。相比Logstash,Beats所占系统的CPU和内存几乎可以忽略不计。

目前Beats包含六种工具:
1、Packetbeat:网络数据(收集网络流量数据)
2、Metricbeat:指标(收集系统、进程和文件系统级别的CPU和内存使用情况等数据)
3、Filebeat:日志文件(收集文件数据)
4、Winlogbeat:windows事件日志(收集Windows事件日志数据)
5、Auditbeat:审计数据(收集审计日志)
6、Heartbeat:运行时间监控(收集系统运行时的数据)

Filebeat 有两个主要组件:
1.harvester:一个harvester负责读取一个单个文件的内容。harvester逐行读取每个文件,并把这些内容发送到输出。每个文件启动一个harvester。
2.Input:一个input负责管理harvesters,并找到所有要读取的源。如果input类型是log,则input查找驱动器上与已定义的log日志路径匹配的所有文件,并为每个文件启动一个harvester。

工作机制

1.在任何环境下,应用程序都有停机的可能性。 Filebeat 读取并转发日志行,如果中断,则会记住所有事件恢复联机状态时所在位置。保持每个文件的状态,并经常刷新注册表文件中的磁盘状态。状态用于记住harvester正在读取的最后偏移量,并确保发送所有日志行。所以它能保证事件至少传递一次到配置的输出,没有数据丢失。

2.Filebeat带有内部模块(auditd,Apache,Nginx,System和MySQL),可通过一个指定命令来简化通用日志格式的收集,解析和可视化

3.FileBeat不会让你的管道超负荷。FileBeat 如果是向 Logstash 传输数据,当 Logstash 忙于处理数据,会通知 FileBeat 放慢读取速度。一旦拥塞得到解决,FileBeat将恢复到原来的速度并继续传播。(动态调整流速

传输方案

可以向es输出,向logstash输出,向kafka输出

1、output.elasticsearch
如果你希望使用 filebeat 直接向 elasticsearch 输出数据,需要配置 output.elasticsearch

1
2
output.elasticsearch:
hosts: ["es IP:9200"]

2、output.logstash
如果使用filebeat向 logstash输出数据,然后由 logstash 再向elasticsearch 输出数据,需要配置 output.logstash。 logstash 和 filebeat 一起工作时,如果 logstash 忙于处理数据,会通知FileBeat放慢读取速度。一旦拥塞得到解决,FileBeat 将恢复到原来的速度并继续传播。这样,可以减少管道超负荷的情况。

1
2
output.logstash:
hosts: ["logstash IP:5044"]

3、output.kafka
如果使用filebeat向kafka输出数据,然后由 logstash 作为消费者拉取kafka中的日志,并再向elasticsearch 输出数据,需要配置 output.logstash

1
2
3
4
output.kafka:
enabled: true
hosts: ["kafka IP:9092"]
topic: elfk8stest

logstash组件

Logstash是一个开源数据收集引擎,具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地。Logstash 是一个应用程序日志、事件的传输、处理、管理和搜索的平台。你可以用它来统一对应用程序日志进行收集管理,提供 Web 接口用于查询和统计。

Logstash 有两个必要元素:input 和 output ,一个可选元素:filter。 这三个元素,分别代表 Logstash 事件处理的三个阶段:输入 > 过滤器 > 输出

1.Input负责从数据源采集数据。Logstash 支持各种输入选择 ,可以在同一时间从众多常用来源捕捉事件。能够以连续的流式传输方式,轻松地从你的日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。
2.filter 将数据修改为你指定的格式或内容,Logstash 能够动态地转换和解析数据,不受格式或复杂度的影响:
3.output 将数据传输到目的地。Logstash 提供众多输出选择,你可以将数据发送到你要指定的地方。

在实际应用场景中,通常输入、输出、过滤器不止一个。Logstash 的这三个元素都使用插件式管理方式,可以根据应用需要,灵活的选用各阶段需要的插件,并组合使用。

常用input模块

Logstash 支持各种输入选择 ,可以在同一时间从众多常用来源捕捉事件。能够以连续的流式传输方式,可从日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。

file:从文件系统上的文件读取
syslog:在众所周知的端口514上侦听系统日志消息,并根据RFC3164格式进行解析
redis:从redis服务器读取,使用redis通道和redis列表。 Redis经常用作集中式Logstash安装中的“代理”,它将接收来自远程Logstash“托运人”的Logstash事件排队。
beats:处理由Filebeat发送的事件。

常用的filter模块

过滤器是Logstash管道中的中间处理设备。可以将条件过滤器组合在一起,对事件执行操作。
grok:解析和结构任意文本。 Grok目前是Logstash中将非结构化日志数据解析为结构化和可查询的最佳方法。
mutate:对事件字段执行一般转换。可以重命名,删除,替换和修改事件中的字段。
drop:完全放弃一个事件,例如调试事件。
clone:制作一个事件的副本,可能会添加或删除字段。
geoip:添加有关IP地址的地理位置的信息

常用output

elasticsearch:将事件数据发送给 Elasticsearch(推荐模式)。
file:将事件数据写入文件或磁盘。
graphite:将事件数据发送给 graphite(一个流行的开源工具,存储和绘制指标, http://graphite.readthedocs.io/en/latest/)。
statsd:将事件数据发送到 statsd (这是一种侦听统计数据的服务,如计数器和定时器,通过UDP发送并将聚合发送到一个或多个可插入的后端服务)。

常用code插件

json:以JSON格式对数据进行编码或解码。
multiline:将多行文本事件(如java异常和堆栈跟踪消息)合并为单个事件。

fluentd组件

fluentd是一个针对日志的收集、处理、转发系统。通过丰富的插件系统,可以收集来自于各种系统或应用的日志,转化为用户指定的格式后,转发到用户所指定的日志存储系统之中。
fluentd 常常被拿来和Logstash比较,我们常说EFK,F就是这个agent。fluentd 是随着Docker和es一起流行起来的agent。

Fluentd 创建的初衷主要是尽可能的使用 JSON 作为日志输出,所以传输工具及其下游的传输线不需要猜测子字符串里面各个字段的类型。这样,它为几乎所有的语言都提供库,这也意味着,我们可以将它插入到我们自定义的程序中。

  • fluentd 比 logstash 更省资源;
  • 更轻量级的 fluent-bid 对应 filebeat,作为部署在结点上的日志收集器;
  • fluentd 有更多强大、开放的插件数量和社区。插件多,也非常灵活,规则也不复杂。

日志采集工具对比分析

常见的日志采集工具有Logstash、Filebeat、Fluentd、Logagent、rsyslog等

Logstash

优势——功能强大

Logstash 主要的优点就是它的灵活性,主要因为它有很多插件,详细的文档以及直白的配置格式让它可以在多种场景下应用。我们基本上可以在网上找到很多资源,几乎可以处理任何问题。

劣势

Logstash 致命的问题是它的性能以及资源消耗(默认的堆大小是 1GB)。尽管它的性能在近几年已经有很大提升,与它的替代者们相比还是要慢很多的。这里有 Logstash 与 rsyslog 性能对比以及Logstash 与 filebeat 的性能对比。它在大数据量的情况下会是个问题。

另一个问题是它目前不支持缓存,目前的典型替代方案是将 Redis 或 Kafka 作为中心缓冲池:

应用场景

因为 Logstash 自身的灵活性以及网络上丰富的资料,Logstash 适用于原型验证阶段使用,或者解析非常的复杂的时候。在不考虑服务器资源的情况下,如果服务器的性能足够好,我们也可以为每台服务器安装 Logstash 。我们也不需要使用缓冲,因为文件自身就有缓冲的行为,而 Logstash 也会记住上次处理的位置。
当判断 Logstash 的性能是否足够好时,重要的是对吞吐量的需求有着准确的估计,这也决定了需要为 Logstash 投入多少硬件资源。

Filebeat

轻量级,也会和 Logstash 一样记住上次读取的偏移

优势

Filebeat 只是一个二进制文件没有任何依赖。它占用资源极少,尽管它还十分年轻,正式因为它简单,所以几乎没有什么可以出错的地方,所以它的可靠性高。它也为我们提供了很多可以调节的点,例如:它以何种方式搜索新的文件,以及当文件有一段时间没有发生变化时,何时选择关闭文件句柄。

劣势

Filebeat 的应用范围十分有限,所以在某些场景下我们会碰到问题。例如,如果使用 Logstash 作为下游管道,我们同样会遇到性能问题。正因为如此,Filebeat 的范围在扩大。开始时,它只能将日志发送到 Logstash 和 Elasticsearch,而现在它可以将日志发送给 Kafka 和 Redis,在 5.x 版本中,它还具备过滤的能力。

Fluentd

优势

和多数 Logstash 插件一样,Fluentd 插件是用 Ruby 语言开发的非常易于编写维护。所以它数量很多,几乎所有的源和目标存储都有插件(各个插件的成熟度也不太一样)。这也意味这我们可以用 Fluentd 来串联所有的东西。

劣势

因为在多数应用场景下,我们会通过 Fluentd 得到结构化的数据,它的灵活性并不好。但是我们仍然可以通过正则表达式,来解析非结构化的数据。尽管,性能在大多数场景下都很好,但它并不是最好的,和 syslog-ng 一样,它的缓冲只存在与输出端,单线程核心以及 Ruby GIL 实现的插件意味着它大的节点下性能是受限的,不过,它的资源消耗在大多数场景下是可以接受的。对于小的或者嵌入式的设备,可能需要看看 Fluent Bit

应用场景

Fluentd 在日志的数据源和目标存储各种各样时非常合适,因为它有很多插件。而且,如果大多数数据源都是自定义的应用,可以发现用 fluentd 的库要比将日志库与其他传输工具结合起来要容易很多

logagent

Logagent 是 Sematext 提供的传输工具,它用来将日志传输到 Logsene(一个基于 SaaS 平台的 Elasticsearch API),因为 Logsene 会暴露 Elasticsearch API,所以 Logagent 可以很容易将数据推送到 Elasticsearch 。

优势

可以获取 /var/log 下的所有信息,解析各种格式(Elasticsearch,Solr,MongoDB,Apache HTTPD等等),它可以掩盖敏感的数据信息,例如,个人验证信息(PII),出生年月日,信用卡号码,等等。它还可以基于 IP 做 GeoIP 丰富地理位置信息(例如,access logs)。同样,它轻量又快速,可以将其置入任何日志块中。在新的 2.0 版本中,它以第三方 node.js 模块化方式增加了支持对输入输出的处理插件。重要的是Logagent 有本地缓冲,所以不像 Logstash ,在数据传输目的地不可用时会丢失日志。

劣势

尽管 Logagent 有些比较有意思的功能(例如,接收 Heroku 或 CloudFoundry 日志),但是它并没有 Logstash 灵活。插件不如他多

logtail

阿里云日志服务的生产者,目前在阿里集团内部机器上运行,经过3年多时间的考验,目前为阿里公有云用户提供日志收集服务。

采用C++语言实现,对稳定性、资源控制、管理等下过很大的功夫,性能良好。相比于logstash、fluentd的社区支持,logtail功能较为单一,专注日志收集功能。

优势
logtail占用机器cpu、内存资源最少,结合阿里云日志服务的E2E体验良好。

劣势
logtail目前对特定日志类型解析的支持较弱,后续需要把这一块补起来。

rsyslog

绝大多数 Linux 发布版本默认的 syslog 守护进程,rsyslog 可以做的不仅仅是将日志从 syslog socket 读取并写入 /var/log/messages 。它可以提取文件、解析、缓冲(磁盘和内存)以及将它们传输到多个目的地,包括 Elasticsearch 。可以从此处找到如何处理 Apache 以及系统日志。

优势

rsyslog 是经测试过的最快的传输工具,最轻的

劣势

配置相对困难,文档难找,并且有bug

典型应用场景

rsyslog 适合那些非常轻的应用(应用,小VM,Docker容器)。如果需要在另一个传输工具(例如,Logstash)中进行处理,可以直接通过 TCP 转发 JSON ,或者连接 Kafka/Redis 缓冲。

EFK组件部署

es部署(nfs-provisioner)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
1.创建一个ns专门放efk的相关pod,叫做kube-logging
kubectl apply -f kube-logging.yaml

2.创建es对应的svc
cat elasticsearch_svc.yaml
kind: Service
apiVersion: v1
metadata:
name: elasticsearch
namespace: kube-logging
labels:
app: elasticsearch
spec:
selector:
app: elasticsearch # 标签
clusterIP: None
ports: # 两个端口
- port: 9200
name: rest
- port: 9300
name: inter-node

get svc -n kube-logging
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
elasticsearch ClusterIP None <none> 9200/TCP,9300/TCP 15s

解释下为什么ClusterIP为None,无头服务handless service,在后端使用statefulset作为pod时
不需要实际的ClusterIP进行通信,可以直接通过dns来访问,

3.创建sa,rbac,并且绑定
kubectl create clusterrolebinding nfs-provisioner-clusterrolebinding --clusterrole=cluster-admin --serviceaccount=default:nfs-provisioner

4.创建存储类 使用nfs供应商动态提供Storageclass
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: do-block-storage
provisioner: example.com/nfs

5.使用nfs供应商创建
kind: Deployment
apiVersion: apps/v1
metadata:
name: nfs-provisioner
spec:
selector:
matchLabels:
app: nfs-provisioner
replicas: 1
strategy:
type: Recreate
template:
metadata:
labels:
app: nfs-provisioner
spec:
serviceAccount: nfs-provisioner
containers:
- name: nfs-provisioner
image: registry.cn-beijing.aliyuncs.com/mydlq/nfs-subdir-external-provisioner:v4.0.0
imagePullPolicy: IfNotPresent
volumeMounts:
- name: nfs-client-root
mountPath: /persistentvolumes
env:
- name: PROVISIONER_NAME
value: example.com/nfs
- name: NFS_SERVER
value: 192.168.10.121
- name: NFS_PATH
value: /data/v1
volumes:
- name: nfs-client-root
nfs:
server: 192.168.10.121
path: /data/v1

6.做三个es,用statefulset创建,es版本7.12.1
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: es-cluster
namespace: kube-logging
spec:
serviceName: elasticsearch
replicas: 3
selector:
matchLabels:
app: elasticsearch
template:
metadata:
labels:
app: elasticsearch
spec:
containers:
- name: elasticsearch
image: elasticsearch:7.12.1
imagePullPolicy: IfNotPresent
resources:
limits:
cpu: 1000m
requests:
cpu: 100m
ports:
- containerPort: 9200
name: rest
protocol: TCP
- containerPort: 9300
name: inter-node
protocol: TCP
volumeMounts:
- name: data
mountPath: /usr/share/elasticsearch/data
env:
- name: cluster.name
value: k8s-logs
- name: node.name
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: discovery.seed_hosts
value: "es-cluster-0.elasticsearch,es-cluster-1.elasticsearch,es-cluster-2.elasticsearch"
- name: cluster.initial_master_nodes
value: "es-cluster-0,es-cluster-1,es-cluster-2"
- name: ES_JAVA_OPTS
value: "-Xms512m -Xmx512m"
initContainers:
- name: fix-permissions
image: busybox
imagePullPolicy: IfNotPresent
command: ["sh", "-c", "chown -R 1000:1000 /usr/share/elasticsearch/data"]
securityContext:
privileged: true
volumeMounts:
- name: data
mountPath: /usr/share/elasticsearch/data
- name: increase-vm-max-map
image: busybox
imagePullPolicy: IfNotPresent
command: ["sysctl", "-w", "vm.max_map_count=262144"]
securityContext:
privileged: true
- name: increase-fd-ulimit
image: busybox
imagePullPolicy: IfNotPresent
command: ["sh", "-c", "ulimit -n 65536"]
securityContext:
privileged: true
volumeClaimTemplates:
- metadata:
name: data
labels:
app: elasticsearch
spec:
accessModes: [ "ReadWriteOnce" ]
storageClassName: do-block-storage
resources:
requests:
storage: 10Gi

nfs-provisioner供应商的sa和rbac

sa部分

1
2
3
4
apiVersion: v1
kind: ServiceAccount
metadata:
name: nfs-provisioner

rbac部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: nfs-provisioner-runner
rules:
- apiGroups: [""]
resources: ["persistentvolumes"]
verbs: ["get", "list", "watch", "create", "delete"]
- apiGroups: [""]
resources: ["persistentvolumeclaims"]
verbs: ["get", "list", "watch", "update"]

- apiGroups: ["storage.k8s.io"]
resources: ["storageclasses"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["events"]
verbs: ["create", "update", "patch"]
- apiGroups: [""]
resources: ["services", "endpoints"]
verbs: ["get"]
- apiGroups: ["extensions"]
resources: ["podsecuritypolicies"]
resourceNames: ["nfs-provisioner"]
verbs: ["use"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: run-nfs-provisioner
subjects:
- kind: ServiceAccount
name: nfs-provisioner
namespace: default
roleRef:
kind: ClusterRole
name: nfs-provisioner-runner
apiGroup: rbac.authorization.k8s.io
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: leader-locking-nfs-provisioner
rules:
- apiGroups: [""]
resources: ["endpoints"]
verbs: ["get", "list", "watch", "create", "update", "patch"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: leader-locking-nfs-provisioner
subjects:
- kind: ServiceAccount
name: nfs-provisioner
namespace: default
roleRef:
kind: Role
name: leader-locking-nfs-provisioner
apiGroup: rbac.authorization.k8s.io

kibana部署

kibana版本7.12.1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
apiVersion: v1
kind: Service
metadata:
name: kibana
namespace: kube-logging
labels:
app: kibana
spec:
type: NodePort
ports:
- port: 5601
selector:
app: kibana
---
kind: Deployment
metadata:
name: kibana
namespace: kube-logging
labels:
app: kibana
spec:
replicas: 1
selector:
matchLabels:
app: kibana
template:
metadata:
labels:
app: kibana
spec:
containers:
- name: kibana
image: kibana:7.12.1
imagePullPolicy: IfNotPresent
resources:
limits:
cpu: 1000m
requests:
cpu: 100m
env:
- name: ELASTICSEARCH_URL
value: http://elasticsearch:9200
ports:
- containerPort: 5601

kubectl get svc -n kube-logging
#NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
#elasticsearch ClusterIP None <none> 9200/TCP,9300/TCP 29h
#kibana NodePort 10.108.31.74 <none> 5601:32075/TCP 26h

fluentd部署

fluentd镜像为docker.io/fluent/fluentd-kubernetes-daemonset:v1.16-debian-elasticsearch7-1
装了针对k8s进行日志收集的插件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
用daemonset安装,因为要在所有节点上都运行以采集日志
fluentd版本1.4.2

apiVersion: v1 # sa
kind: ServiceAccount
metadata:
name: fluentd
namespace: kube-logging
labels:
app: fluentd
---
apiVersion: rbac.authorization.k8s.io/v1 # rbac
kind: ClusterRole
metadata:
name: fluentd
labels:
app: fluentd
rules:
- apiGroups:
- ""
resources:
- pods
- namespaces
verbs:
- get
- list
- watch
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: fluentd
roleRef:
kind: ClusterRole
name: fluentd
apiGroup: rbac.authorization.k8s.io
subjects:
- kind: ServiceAccount
name: fluentd
namespace: kube-logging
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: fluentd
namespace: kube-logging
labels:
app: fluentd
spec:
selector:
matchLabels:
app: fluentd
template:
metadata:
labels:
app: fluentd
spec:
serviceAccount: fluentd
serviceAccountName: fluentd
tolerations:
- key: node-role.kubernetes.io/control-plane
effect: NoSchedule
containers:
- name: fluentd
image: docker.io/fluent/fluentd-kubernetes-daemonset:v1.16-debian-elasticsearch7-1
imagePullPolicy: IfNotPresent
env:
- name: FLUENT_ELASTICSEARCH_HOST
value: "elasticsearch.kube-logging.svc.cluster.local"
- name: FLUENT_ELASTICSEARCH_PORT
value: "9200"
- name: FLUENT_ELASTICSEARCH_SCHEME
value: "http"
- name: FLUENTD_SYSTEMD_CONF
value: disable
- name: FLUENT_CONTAINER_TAIL_PARSER_TYPE
value: "cri"
- name: FLUENT_CONTAINER_TAIL_PARSER_TIME_FORMAT
value: "%Y-%m-%dT%H:%M:%S.%L%z"
resources:
limits:
memory: 512Mi
requests:
cpu: 100m
memory: 200Mi
volumeMounts:
- name: varlog
mountPath: /var/log
- name: containers
mountPath: /var/log/containers
readOnly: true
terminationGracePeriodSeconds: 30
volumes:
- name: varlog
hostPath:
path: /var/log
- name: containers
hostPath:
path: /var/log/containers

kubectl apply -f fluentd.yaml

kubectl get pods -n kube-logging -owide
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES
es-cluster-0 1/1 Running 0 30h 10.244.193.162 ws-k8s-node3 <none> <none>
es-cluster-1 1/1 Running 0 30h 10.244.179.57 ws-k8s-node1 <none> <none>
es-cluster-2 1/1 Running 0 30h 10.244.234.86 ws-k8s-node2 <none> <none>
fluentd-7ckcr 1/1 Running 0 10s 10.244.250.2 ws-k8s-master2 <none> <none>
fluentd-h5zjc 1/1 Running 0 18s 10.244.58.66 ws-k8s-master3 <none> <none>
fluentd-pzd6f 1/1 Running 0 2m55s 10.244.234.87 ws-k8s-node2 <none> <none>
fluentd-q9pgw 1/1 Running 0 2m55s 10.244.193.164 ws-k8s-node3 <none> <none>
fluentd-whmzq 1/1 Running 0 2m55s 10.244.179.58 ws-k8s-node1 <none> <none>
fluentd-zhlzq 1/1 Running 0 2m55s 10.244.189.207 ws-k8s-master1 <none> <none>
kibana-64955889c5-xll66 1/1 Running 0 28h 10.244.193.163 ws-k8s-node3 <none> <none>

使用EFK

进入kibana,选择discover发现,
Create index patten创建索引模式
输入logstash-*(正则匹配)对已经采集到的日志进行筛选
选择timestamp时间戳

elasticsearch+fluentd+kibana+kafka+logstash

es和kibana配置与上文相同,fluentd需要配置额外的config,并且在daemonset中挂载新创建的config

fluentd配置文件

fluentd-configmap.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
vim fluentd-configmap.yaml
kind: ConfigMap
apiVersion: v1
metadata:
name: fluentd-config
namespace: kube-logging
labels:
addonmanager.kubernetes.io/mode: Reconcile
data:
system.conf: |-
<system>
root_dir /tmp/fluentd-buffers/
</system>
containers.input.conf: |-
<source>
@id fluentd-containers.log
@type tail
path /var/log/containers/*.log
pos_file /var/log/es-containers.log.pos
time_format %Y-%m-%dT%H:%M:%S.%NZ
localtime
tag raw.kubernetes.*
format json
read_from_head true
</source>
# Detect exceptions in the log output and forward them as one log entry.
<match raw.kubernetes.**>
@id raw.kubernetes
@type detect_exceptions
remove_tag_prefix raw
message log
stream stream
multiline_flush_interval 5
max_bytes 500000
max_lines 1000
</match>
system.input.conf: |-
# Logs from systemd-journal for interesting services.
<source>
@id journald-docker
@type systemd
filters [{ "_SYSTEMD_UNIT": "docker.service" }]
<storage>
@type local
persistent true
</storage>
read_from_head true
tag docker
</source>
<source>
@id journald-kubelet
@type systemd
filters [{ "_SYSTEMD_UNIT": "kubelet.service" }]
<storage>
@type local
persistent true
</storage>
read_from_head true
tag kubelet
</source>
forward.input.conf: |-
# Takes the messages sent over TCP
<source>
@type forward
</source>
output.conf: |-
# Enriches records with Kubernetes metadata
<filter kubernetes.**>
@type kubernetes_metadata
</filter>
<match **>
@id elasticsearch
@type elasticsearch
@log_level info
include_tag_key true
host elasticsearch.kube-logging.svc.cluster.local
port 9200
logstash_format true
request_timeout 30s
<buffer>
@type file
path /var/log/fluentd-buffers/kubernetes.system.buffer
flush_mode interval
retry_type exponential_backoff
flush_thread_count 2
flush_interval 5s
retry_forever
retry_max_interval 30
chunk_limit_size 2M
queue_limit_length 8
overflow_action block
</buffer>
</match>

fluentd-daemonset.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
vim fluentd-daemonset.yaml 
apiVersion: v1
kind: ServiceAccount
metadata:
name: fluentd-es
namespace: kube-logging
labels:
k8s-app: fluentd-es
kubernetes.io/cluster-service: "true"
addonmanager.kubernetes.io/mode: Reconcile
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: fluentd-es
labels:
k8s-app: fluentd-es
kubernetes.io/cluster-service: "true"
addonmanager.kubernetes.io/mode: Reconcile
rules:
- apiGroups:
- ""
resources:
- "namespaces"
- "pods"
verbs:
- "get"
- "watch"
- "list"
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: fluentd-es
labels:
k8s-app: fluentd-es
kubernetes.io/cluster-service: "true"
addonmanager.kubernetes.io/mode: Reconcile
subjects:
- kind: ServiceAccount
name: fluentd-es
namespace: kube-logging
apiGroup: ""
roleRef:
kind: ClusterRole
name: fluentd-es
apiGroup: ""
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: fluentd-es
namespace: kube-logging
labels:
k8s-app: fluentd-es
version: v2.0.4
kubernetes.io/cluster-service: "true"
addonmanager.kubernetes.io/mode: Reconcile
spec:
selector:
matchLabels:
k8s-app: fluentd-es
version: v2.0.4
template:
metadata:
labels:
k8s-app: fluentd-es
kubernetes.io/cluster-service: "true"
version: v2.0.4
# This annotation ensures that fluentd does not get evicted if the node
# supports critical pod annotation based priority scheme.
# Note that this does not guarantee admission on the nodes (#40573).
annotations:
scheduler.alpha.kubernetes.io/critical-pod: ''
spec:
serviceAccountName: fluentd-es
containers:
- name: fluentd-es
image: cnych/fluentd-elasticsearch:v2.0.4
env:
- name: FLUENTD_ARGS
value: --no-supervisor -q
resources:
limits:
memory: 500Mi
requests:
cpu: 100m
memory: 200Mi
volumeMounts:
- name: varlog
mountPath: /var/log
- name: varlibdockercontainers
mountPath: /var/lib/docker/containers
readOnly: true
- name: config-volume
mountPath: /etc/fluent/config.d
nodeSelector:
beta.kubernetes.io/fluentd-ds-ready: "true"
tolerations:
- key: node-role.kubernetes.io/control-plane
operator: Exists
effect: NoSchedule
terminationGracePeriodSeconds: 30
volumes:
- name: varlog
hostPath:
path: /var/log
- name: varlibdockercontainers
hostPath:
path: /var/lib/docker/containers
- name: config-volume
configMap:
name: fluentd-config

为master节点添加标签

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
kubectl label nodes ws-k8s-master1 beta.kubernetes.io/fluentd-ds-ready=true
kubectl label nodes ws-k8s-master2 beta.kubernetes.io/fluentd-ds-ready=true
kubectl label nodes ws-k8s-master3 beta.kubernetes.io/fluentd-ds-ready=true
kubectl label nodes ws-k8s-node1 beta.kubernetes.io/fluentd-ds-ready=true
kubectl label nodes ws-k8s-node2 beta.kubernetes.io/fluentd-ds-ready=true
kubectl label nodes ws-k8s-node3 beta.kubernetes.io/fluentd-ds-ready=true

kubectl get pods -n kube-logging
NAME READY STATUS RESTARTS AGE
es-cluster-0 1/1 Running 0 13m
es-cluster-1 1/1 Running 0 12m
es-cluster-2 1/1 Running 0 12m
fluentd-es-79hf2 1/1 Running 0 2s
fluentd-es-bbtk6 1/1 Running 0 4m8s
fluentd-es-dd47d 1/1 Running 0 4m31s
fluentd-es-kz8sg 1/1 Running 0 4m7s
fluentd-es-mz5ll 1/1 Running 0 4m9s
fluentd-es-wknz8 1/1 Running 0 4m9s
kibana-64955889c5-gwqgx 1/1 Running 1 (11m ago) 13m

fluentd接入kafka —— kafka-config.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
vim kafka-config.yaml
kind: ConfigMap
apiVersion: v1
metadata:
name: fluentd-config
namespace: kube-logging
labels:
addonmanager.kubernetes.io/mode: Reconcile
data:
system.conf: |-
<system>
root_dir /tmp/fluentd-buffers/
</system>
containers.input.conf: |-
<source>
@id fluentd-containers.log
@type tail
path /var/log/containers/*.log
pos_file /var/log/es-containers.log.pos
time_format %Y-%m-%dT%H:%M:%S.%NZ
localtime
tag raw.kubernetes.*
format json
read_from_head true
</source>
# Detect exceptions in the log output and forward them as one log entry.
<match raw.kubernetes.**>
@id raw.kubernetes
@type detect_exceptions
remove_tag_prefix raw
message log
stream stream
multiline_flush_interval 5
max_bytes 500000
max_lines 1000
</match>
system.input.conf: |-
# Logs from systemd-journal for interesting services.
<source>
@id journald-docker
@type systemd
filters [{ "_SYSTEMD_UNIT": "docker.service" }]
<storage>
@type local
persistent true
</storage>
read_from_head true
tag docker
</source>
<source>
@id journald-kubelet
@type systemd
filters [{ "_SYSTEMD_UNIT": "kubelet.service" }]
<storage>
@type local
persistent true
</storage>
read_from_head true
tag kubelet
</source>
forward.input.conf: |-
# Takes the messages sent over TCP
<source>
@type forward
</source>
output.conf: |-
# Enriches records with Kubernetes metadata
<filter kubernetes.**>
@type kubernetes_metadata
</filter>
<match **>
@id kafka
@type kafka2
@log_level info
include_tag_key true
# list of seed brokers
brokers 192.168.10:9092
use_event_time true
# buffer settings
<buffer>
@type file
path /var/log/fluentd-buffers/kubernetes.system.buffer
flush_mode interval
retry_type exponential_backoff
flush_thread_count 2
flush_interval 5s
retry_forever
retry_max_interval 30
chunk_limit_size 2M
queue_limit_length 8
overflow_action block
</buffer>
# data type settings
<format>
@type json
</format>
# topic settings
topic_key topic
default_topic messages
# producer settings
required_acks -1
compression_codec gzip
</match>

重启fluentd

部署与配置logstash

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
安装logstash
wget https://artifacts.elastic.co/downloads/logstash/logstash-8.7.0-linux-x86_64.tar.gz
tar -zxvf logstash-8.7.0-linux-x86_64.tar.gz

测试
bin/logstash -e 'input { stdin { } } output { stdout {} }'

配置logstash消费kafka,并写入es
cd logstash-8.7.0/
cat > config/kafkaInput_fluentd.conf << EOF
input {
kafka {
bootstrap_servers => ["192.168.10.100:9092"]
client_id => "fluentd"
group_id => "fluentd"
consumer_threads => 1
auto_offset_reset => "latest"
topics => ["messages"]
}
}

filter {
json{
source => "message"
}

ruby {
code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('@timestamp',event.get('timestamp'))"
}
ruby {
code => "event.set('find_time',event.get('@timestamp').time.localtime - 8*60*60)"
}
mutate {
remove_field => ["timestamp"]
remove_field => ["message"]
}

}
output {
elasticsearch{
hosts => ["elasticsearch.kube-logging.svc.cluster.local:9200"]
index => "kubernetes_%{+YYYY_MM_dd}"

}
# stdout {
# codec => rubydebug
# }
}
EOF

我的kafka位置192.168.10.100:9092
es IP为elasticsearch.kube-logging.svc.cluster.local:9200

启动logstash
nohup ./bin/logstash -f config/kafkaInput_fluentd.conf \
--config.reload.automatic --path.data=/opt/logstash/data_fluentd 2>&1 > fluentd.log &

CATALOG
  1. 1. 日志处理概述
    1. 1.1. ELK
    2. 1.2. EFK
  2. 2. 日志组件概述
    1. 2.1. es组件
    2. 2.2. beats组件
      1. 2.2.1. 工作机制
      2. 2.2.2. 传输方案
    3. 2.3. logstash组件
      1. 2.3.1. 常用input模块
      2. 2.3.2. 常用的filter模块
      3. 2.3.3. 常用output
      4. 2.3.4. 常用code插件
    4. 2.4. fluentd组件
  3. 3. 日志采集工具对比分析
    1. 3.1. Logstash
      1. 3.1.1. 优势——功能强大
      2. 3.1.2. 劣势
      3. 3.1.3. 应用场景
    2. 3.2. Filebeat
      1. 3.2.1. 优势
      2. 3.2.2. 劣势
    3. 3.3. Fluentd
      1. 3.3.1. 优势
      2. 3.3.2. 劣势
      3. 3.3.3. 应用场景
    4. 3.4. logagent
    5. 3.5. logtail
    6. 3.6. rsyslog
  4. 4. EFK组件部署
    1. 4.1. es部署(nfs-provisioner)
      1. 4.1.1. nfs-provisioner供应商的sa和rbac
    2. 4.2. kibana部署
    3. 4.3. fluentd部署
    4. 4.4. 使用EFK
  5. 5. elasticsearch+fluentd+kibana+kafka+logstash
    1. 5.1. fluentd配置文件
      1. 5.1.1. fluentd-configmap.yaml
      2. 5.1.2. fluentd-daemonset.yaml
      3. 5.1.3. fluentd接入kafka —— kafka-config.yaml
    2. 5.2. 部署与配置logstash