创建一个索引模板
昨天手动put了syslog-security-2026.03.19的索引格式,但一条一条添加肯定不行
以后任何以 syslog-security- 开头的索引,都会自动变成 3 分片、1 副本,并且 src_ip 永远是 ip 类型。

部署与使用Logstash
支持input→filter→ouput的工作流,特别是filter功能可以识别IP、时间格式对数据进行拆分,输出规整的精美json
参考文档:
二进制部署:https://www.elastic.co/docs/reference/logstash/installing-logstash
目前我们是初学者,就先用二进制部署,用helm部署的k8s版本就再说吧
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo gpg --dearmor -o /usr/share/keyrings/elastic-keyring.gpg sudo apt-get install apt-transport-https echo "deb [signed-by=/usr/share/keyrings/elastic-keyring.gpg] https://artifacts.elastic.co/packages/9.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-9.x.list sudo apt-get update && sudo apt-get install logstash
systemctl enable logstash.service --now systemctl is-active logstash.service active
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch cat > /etc/yum.repos.d/logstash.repo <<EOF [logstash-9.x] name=Elastic repository for 9.x packages baseurl=https://artifacts.elastic.co/packages/9.x/yum gpgcheck=1 gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch enabled=1 autorefresh=1 type=rpm-md EOF yum -y install logstash
cat > /etc/logstash/conf.d/ssh_filter.conf <<'EOF' input { beats { port => 5044 } } filter { grok { match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:hostname} sshd\[\d+\]: %{WORD:auth_result} password for %{USER:username} from %{IP:src_ip} port %{NUMBER:port}" } } if [auth_result] == "Failed" { mutate { add_field => { "event_type" => "ssh_brute_force" } replace => { "severity" => 3 } } } } output { stdout { codec => rubydebug } elasticsearch { hosts => ["https://192.168.10.100:9200"] index => "syslog-security-%{+YYYY.MM.dd}" api_key => "MnQxUUJaMEIzMlk0TE9TWnFaeEQ6UjAyLVloeUFNRGw4Rl85d25BeEM3dw==" ssl_verification_mode => "none" } } EOF
这里的工作流解释: 1.接收beats插件给5044端口的数据,beats字段指插件 2.通过filiter使用grok插件规定格式进行正则筛选 3.对数据进行清洗,如果是Failed就标记严重性为3 4.将输出发送给控制台(debug),同时发送给es集群
systemctl restart logstash.service
tail -f /var/log/logstash/logstash-plain.log
ss -tunlp | grep 5044 tcp LISTEN 0 4096 *:5044 *:* users:(("java",pid=62517,fd=154))
|
部署与使用filebeat
极轻量的日志收集器,只搬运不加工,支持断点续传(offset偏移量)
适用二进制部署和k8s集群部署,我这k8s集群有点问题,暂时先用二进制部署
参考文档:https://www.elastic.co/docs/reference/beats/filebeat
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-9.3.2-x86_64.rpm rpm -vi filebeat-9.3.2-x86_64.rpm
filebeat.inputs:
filebeat.inputs: - type: filestream id: ssh-security-logs enabled: true paths: - /var/log/secure
output.logstash: hosts: ["192.168.10.100:5044"]
systemctl restart filebeat.service
|
测试链路
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| logger "Mar 21 22:00:01 prod-web-01 sshd[123]: Failed password for root from 192.168.10.105 port 5566 ssh2"
journalctl -u logstash -f Mar 23 15:30:42 1panel logstash[128105]: "@version" => "1", Mar 23 15:30:42 1panel logstash[128105]: "timestamp" => "Mar 23 15:30:00", Mar 23 15:30:42 1panel logstash[128105]: "auth_result" => "Failed", Mar 23 15:30:42 1panel logstash[128105]: "port" => "5566", Mar 23 15:30:42 1panel logstash[128105]: "ecs" => { Mar 23 15:30:42 1panel logstash[128105]: "version" => "8.0.0" Mar 23 15:30:42 1panel logstash[128105]: }, Mar 23 15:30:42 1panel logstash[128105]: "log" => { Mar 23 15:30:42 1panel logstash[128105]: "file" => { Mar 23 15:30:42 1panel logstash[128105]: "inode" => "67682980", Mar 23 15:30:42 1panel logstash[128105]: "fingerprint" => "98136ef7ebcc4f2aaf3c2f6ec8556cf7cb6674e1d26ad145fd40c8f4b98bad95", Mar 23 15:30:42 1panel logstash[128105]: "path" => "/var/log/secure", Mar 23 15:30:42 1panel logstash[128105]: "device_id" => "64768" Mar 23 15:30:42 1panel logstash[128105]: }, Mar 23 15:30:42 1panel logstash[128105]: "offset" => 14796257 Mar 23 15:30:42 1panel logstash[128105]: }, Mar 23 15:30:42 1panel logstash[128105]: "tags" => [ Mar 23 15:30:42 1panel logstash[128105]: [0] "beats_input_codec_plain_applied" Mar 23 15:30:42 1panel logstash[128105]: ], Mar 23 15:30:42 1panel logstash[128105]: "input" => { Mar 23 15:30:42 1panel logstash[128105]: "type" => "filestream" Mar 23 15:30:42 1panel logstash[128105]: }, Mar 23 15:30:42 1panel logstash[128105]: "severity" => "3", Mar 23 15:30:42 1panel logstash[128105]: "src_ip" => "10.2.3.6", Mar 23 15:30:42 1panel logstash[128105]: "event_type" => "ssh_brute_force", Mar 23 15:30:42 1panel logstash[128105]: "username" => "root", Mar 23 15:30:42 1panel logstash[128105]: "@timestamp" => 2026-03-23T07:30:40.640Z, Mar 23 15:30:42 1panel logstash[128105]: "message" => "Mar 23 15:30:38 1panel root[344379]: Mar 23 15:30:00 1panel sshd[12345]: Failed password for root from 10.2.3.6 port 5566 ssh2"
|
初步使用kibana

1 2 3 4
| KQL还可以支持一些高级语法,比如 排除法: NOT src_ip : "192.168.10.1" 多重条件: event_type : "ssh_brute_force" AND username : "root" 范围查找: port > 1024
|