搜索中...
🔍

未找到相关结果

Akemi

两周复习EFK第四天之数据深加工-时间轴纠偏与Mutate字段强转

2026/03/25

data插件-时间轴纠偏的意义

1
2
3
4
5
6
7
8
9
10
1.断点续传
默认情况下,ES使用写入时间作为@timestamp,也就是Kibana 的右侧时间筛选器里,默认绑定的是 @timestamp
因为现有的Filebeat 和 Logstash 性能很好,日志产生后 0.1 秒就传到了 ES。所以"发生时间""写入时间"几乎重合,在 Kibana 上感觉不出差别,但如果你的 Logstash 停机维护了 2 小时。重启后,Filebeat 会把积压的 2 小时日志瞬间发过来
这 2 小时的陈旧日志,在 Kibana 看来全都是"刚刚产生的"

2.补录历史数据
如果要把去年的备份日志导入 ES 做审计,没有 date 插件,这些日志全都会显示为"今天"

3.时区问题
Elasticsearch 内部强制使用 UTC(0时区) 存储时间

Mutate插件-字段强转的意义

1
2
3
4
5
6
7
8
9
10
11
12
1."展示""计算"
Grok 插件默认抓取的所有内容都是 String
如果你不强转,"port": "8888""severity": "3" 在 ES 看来和 "root" 这个用户名没有区别
也就无法在Kibana里对它们做任何数学运算:不能算平均值、不能算总和、不能做范围过滤

2.节省存储与查询性能
在底层的 Lucene 引擎中,存储一个整数比存储一个等值的字符串要高效得多

3.防止"制图失败"
当你尝试在 Kibana 里画一个折线图来展示"每分钟平均严重程度"时,如果你没做强转,Kibana 会直接报错,或者在选择指标(Metrics)时,下拉菜单里根本找不到 severity 这个字段。

Grok 只是把字符串"切"出来,而 Mutate 的 convert 才是给这些碎片赋予"灵魂"(数据类型)

logstash配置修改

/etc/logstash/conf.d/ssh_filter.conf

(真实操作时,json不支持注释)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# 原配置
filter {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:hostname} sshd\[\d+\]: %{WORD:auth_result} password for %{USER:username} from %{IP:src_ip} port %{NUMBER:port}" }
}
if [auth_result] == "Failed" {
mutate {
add_field => { "event_type" => "ssh_brute_force" }
replace => { "severity" => 3 }
}
}
}

# 修改配置
filter {
# 1. 基础拆分(保持你原有的逻辑,但注意字段名我们统称为 timestamp)
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:hostname} sshd\[\d+\]: %{WORD:auth_result} password for %{USER:username} from %{IP:src_ip} port %{NUMBER:port}" }
}

# 2. 使用data插件,让kibana中的时间轴以日志里的时间为准
date {
# 匹配 timestamp 字段中的内容,尽可能匹配多种格式
match => [ "timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
# 显式指定时区
timezone => "Asia/Shanghai"
}
# 3.把字符串变成数字
mutate {
convert => {
"port" => "integer" # 强转端口为整数,方便以后搜 port > 1024
"severity" => "integer" # 强转严重性为整数,方便以后算平均值
}
}

if [auth_result] == "Failed" {
mutate {
add_field => { "event_type" => "ssh_brute_force" }
replace => { "severity" => 3 }
}
}
}

测试过去的日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
echo "Mar 22 14:30:05 1panel sshd[12345]: Failed password for root from 192.168.10.200 port 2222 ssh2" >> /var/log/secure

api请求es,查询这条日志
# 搜索23号端口号大于2000的所有安全日志
GET /syslog-security-2026.03.22/_search
{
"query": {
"range": {
"port": {
"gt": 2000
}
}
}
}
返回
...
"@version": "1",
"message": "Mar 22 14:30:05 1panel sshd[12345]: Failed password for root from 192.168.10.200 port 2222 ssh2",
"src_ip": "192.168.10.200",
"port": 2222

如果能搜到这条日志,就说明这条历史日志已经被正确存放到了当天3.22的索引中了
同时也可以看到这条日志的port已经不是""字符串了,而是整型
CATALOG
  1. 1. data插件-时间轴纠偏的意义
  2. 2. Mutate插件-字段强转的意义
  3. 3. logstash配置修改
  4. 4. 测试过去的日志