data插件-时间轴纠偏的意义
1 2 3 4 5 6 7 8 9 10
| 1.断点续传 默认情况下,ES使用写入时间作为@timestamp,也就是Kibana 的右侧时间筛选器里,默认绑定的是 @timestamp 因为现有的Filebeat 和 Logstash 性能很好,日志产生后 0.1 秒就传到了 ES。所以"发生时间"和"写入时间"几乎重合,在 Kibana 上感觉不出差别,但如果你的 Logstash 停机维护了 2 小时。重启后,Filebeat 会把积压的 2 小时日志瞬间发过来 这 2 小时的陈旧日志,在 Kibana 看来全都是"刚刚产生的"
2.补录历史数据 如果要把去年的备份日志导入 ES 做审计,没有 date 插件,这些日志全都会显示为"今天"。
3.时区问题 Elasticsearch 内部强制使用 UTC(0时区) 存储时间
|
Mutate插件-字段强转的意义
1 2 3 4 5 6 7 8 9 10 11 12
| 1. 从"展示"到"计算" Grok 插件默认抓取的所有内容都是 String 如果你不强转,"port": "8888" 和 "severity": "3" 在 ES 看来和 "root" 这个用户名没有区别 也就无法在Kibana里对它们做任何数学运算:不能算平均值、不能算总和、不能做范围过滤
2.节省存储与查询性能 在底层的 Lucene 引擎中,存储一个整数比存储一个等值的字符串要高效得多
3.防止"制图失败" 当你尝试在 Kibana 里画一个折线图来展示"每分钟平均严重程度"时,如果你没做强转,Kibana 会直接报错,或者在选择指标(Metrics)时,下拉菜单里根本找不到 severity 这个字段。
Grok 只是把字符串"切"出来,而 Mutate 的 convert 才是给这些碎片赋予"灵魂"(数据类型)
|
logstash配置修改
/etc/logstash/conf.d/ssh_filter.conf
(真实操作时,json不支持注释)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| # 原配置 filter { grok { match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:hostname} sshd\[\d+\]: %{WORD:auth_result} password for %{USER:username} from %{IP:src_ip} port %{NUMBER:port}" } } if [auth_result] == "Failed" { mutate { add_field => { "event_type" => "ssh_brute_force" } replace => { "severity" => 3 } } } }
# 修改配置 filter { # 1. 基础拆分(保持你原有的逻辑,但注意字段名我们统称为 timestamp) grok { match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:hostname} sshd\[\d+\]: %{WORD:auth_result} password for %{USER:username} from %{IP:src_ip} port %{NUMBER:port}" } }
# 2. 使用data插件,让kibana中的时间轴以日志里的时间为准 date { # 匹配 timestamp 字段中的内容,尽可能匹配多种格式 match => [ "timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ] # 显式指定时区 timezone => "Asia/Shanghai" } # 3.把字符串变成数字 mutate { convert => { "port" => "integer" # 强转端口为整数,方便以后搜 port > 1024 "severity" => "integer" # 强转严重性为整数,方便以后算平均值 } }
if [auth_result] == "Failed" { mutate { add_field => { "event_type" => "ssh_brute_force" } replace => { "severity" => 3 } } } }
|
测试过去的日志
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| echo "Mar 22 14:30:05 1panel sshd[12345]: Failed password for root from 192.168.10.200 port 2222 ssh2" >> /var/log/secure
api请求es,查询这条日志 # 搜索23号端口号大于2000的所有安全日志 GET /syslog-security-2026.03.22/_search { "query": { "range": { "port": { "gt": 2000 } } } } 返回 ... "@version": "1", "message": "Mar 22 14:30:05 1panel sshd[12345]: Failed password for root from 192.168.10.200 port 2222 ssh2", "src_ip": "192.168.10.200", "port": 2222
如果能搜到这条日志,就说明这条历史日志已经被正确存放到了当天3.22的索引中了 同时也可以看到这条日志的port已经不是""字符串了,而是整型
|