1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
| from datetime import date, datetime import os
import logging current_dir = os.path.dirname(os.path.abspath(__file__)) alerts_log = os.path.join(current_dir, 'alerts.log') alter_report = os.path.join(current_dir, 'alter_report.txt') format = '%(asctime)s - %(levelname)s - %(message)s' logging.basicConfig(filename=alerts_log,filemode='a',format=format,level=logging.INFO)
from langchain_deepseek import ChatDeepSeek from dotenv import load_dotenv load_dotenv() def call_deepseek(message): deepseek = ChatDeepSeek(model="deepseek-chat",temperature=0.5) logging.debug(f"Deepseek初始化完成...")
system_prompt = """ 你是一名资深运维与安全分析师,擅长分析业务系统异常告警。 请根据提供的登录监控告警信息,生成结构化分析报告,包含: 1. 告警基本信息(名称、级别、时间、关键标签) 2. 可能的原因分析(结合业务场景,如攻击、系统故障、用户行为异常等) 3. 具体处理建议(分步骤说明,可操作) 4. 预防措施(如何避免类似问题再次发生) 报告风格需专业、简洁,重点突出。 """ user_prompt = f""" 告警详细信息如下: {message} """
messages = [system_prompt,user_prompt] response = deepseek.invoke(messages) with open(alter_report, "w", encoding="utf-8") as f: f.write(f"[{datetime.now()}] DeepSeek报告:\n") f.write(f"{response.content}") logging.info(f"DeepSeek响应已保存至 {alter_report}")
from flask import Flask, request, jsonify app = Flask(__name__) @app.route('/alter', methods=['POST']) def handle_alert(): try: data = request.get_json() if 'alerts' not in data: logging.error("数据格式错误:缺少 alerts 字段") return jsonify({"status": "error", "message": "Invalid data format: missing alerts field"}), 400 if not isinstance(data['alerts'], list): logging.error("数据格式错误:alerts 字段不是列表") return jsonify({"status": "error", "message": "alerts field is not a list"}), 400 logging.info("成功接收到告警数据...") alerts_list = data['alerts'] all_messages = [] for alert in alerts_list: logging.debug("开始解析告警信息...") annotations = alert.get('annotations', {}) description = annotations.get('description', 'No description provided') summary = annotations.get('summary', 'No summary provided') labels = alert.get('labels', {}) alert_name = labels.get('alertname', 'Unknown') severity = labels.get('severity', 'Unknown') starts_at = alert.get('startsAt', 'Unknown') status = alert.get('status', 'Unknown') message = f"告警名称: {alert_name}\n" \ f"严重程度: {severity}\n" \ f"状 态: {status}\n" \ f"开始时间: {starts_at}\n" \ f"摘要: {summary}\n" \ f"描述: {description}\n" all_messages.append(message) logging.debug("告警提取完成...") if all_messages: final_content = "\n" + "="*20 + " 批量告警汇总 " + "="*20 + "\n" final_content += "\n---\n".join(all_messages) logging.info(f"正在发送 {len(all_messages)} 条告警至 DeepSeek 分析...") call_deepseek(final_content) return jsonify({"status": "success"}), 200 except Exception as e: logging.error(f"处理告警时出错: {e}") return jsonify({"status": "error", "message": str(e)}), 400
if __name__ == '__main__': try: print(f"告警日志文件路径: {alerts_log}") app.run(host='0.0.0.0', port=5008, debug=True) except Exception as e: print(f"启动应用时出错: {e}")
|