
| import requests import time import psutil import logging from logging.handlers import RotatingFileHandler import os from xml.etree import ElementTree as ET from urllib.parse import urlparse
JENKINS_URL = "xxxx" NODE_NAME = "slave-1" ADMIN = "xxxx" API_TOKEN = "xxxx" KEYWORD = "100%" LOG_PATH = "xxxx"
log_dir = os.path.dirname(LOG_PATH)
try: os.makedirs(log_dir, exist_ok=True) except PermissionError as e: raise SystemExit(f"✖ 无权限创建日志目录: {log_dir}\n错误详情: {e}")
logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG)
file_handler = RotatingFileHandler( filename=LOG_PATH, maxBytes=5*1024*1024, backupCount=5, encoding='utf-8' ) file_handler.setFormatter(logging.Formatter( '[%(asctime)s] [%(levelname).4s] %(message)s' ))
console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter( '[%(levelname)s] %(message)s' ))
logger.handlers.clear() logger.addHandler(file_handler) logger.addHandler(console_handler)
session = requests.Session() session.auth = (ADMIN, API_TOKEN) crumb_header = None original_executors = 1
def init_jenkins(): """初始化 Jenkins 连接""" global crumb_header, original_executors crumb_header = get_crumb_header() print(f"√ 初始化完成 | 原始并发数: {original_executors}")
def get_crumb_header(): """获取Jenkins CSRF防护令牌""" crumb_url = f"{JENKINS_URL}/crumbIssuer/api/json" try: response = session.get(crumb_url) data = response.json() return {data['crumbRequestField']: data['crumb']} except Exception as e: print(f"× 获取 CSRF 失败: {e}") return None
def get_current_executors(): """获取当前并发执行器配置""" config_url = f"{JENKINS_URL}/computer/{NODE_NAME}/config.xml" try: response = session.get(config_url, headers=crumb_header) root = ET.fromstring(response.content) return int(root.find(".//numExecutors").text) except Exception as e: print(f"× 获取并发数失败: {e}") return 1
def get_active_builds(): """获取当前运行中的构建任务列表,返回任务url""" api_url = f"{JENKINS_URL}/computer/{NODE_NAME}/api/json" params = {'tree': 'executors[currentExecutable[url]]'} try: response = session.get(api_url, headers=crumb_header, params=params) return [ parse_build_info(executor['currentExecutable']['url']) for executor in response.json().get('executors', []) if executor.get('currentExecutable', {}).get('url') ] except Exception as e: print(f"× 获取任务失败: {e}") return []
def parse_build_info(url): """解析构建任务URL,返回一个字典""" path = urlparse(url).path.split('/') return { 'job': path[2], 'build_num': path[3], 'url': url }
def check_keyword_in_log(build_info): """检查构建日志是否包含关键标识""" log_url = f"{JENKINS_URL}/job/{build_info['job']}/{build_info['build_num']}/logText/progressiveText" try: response = session.get(log_url, headers=crumb_header) return KEYWORD in response.text except Exception as e: print(f"× 检查日志失败: {build_info['job']} #{build_info['build_num']} - {e}") return False
def adjust_executors(target_num): """调整节点并发执行器数量""" config_url = f"{JENKINS_URL}/computer/{NODE_NAME}/config.xml" try: response = session.get(config_url, headers=crumb_header) root = ET.fromstring(response.content) num_exec = root.find(".//numExecutors") num_exec.text = str(target_num) response = session.post( config_url, headers={'Content-Type': 'text/xml'}, data=ET.tostring(root), auth=session.auth ) return response.status_code == 200 except Exception as e: print(f"× 调整并发数失败: {e}") return False
def get_load_psutil(): load = psutil.getloadavg() cpu_count = psutil.cpu_count()
is_decreasing = (load[0] < load[1] < load[2])
return { '1min': load[0], '5min': load[1], '15min': load[2], 'cpu_cores': cpu_count, 'load_per_core': load[0] / cpu_count, 'is_decreasing': is_decreasing }
if __name__ == "__main__": try: init_jenkins() builds = get_active_builds() passed = sum(1 for build in builds if check_keyword_in_log(build)) if passed > 0: target = passed + 1 logger.info(f"√ 检测到 {passed} 个任务通过编译,尝试调整并发数到 {target}") load_info = get_load_psutil() if load_info['is_decreasing'] or load_info['load_per_core'] < 0.5: if load_info['is_decreasing']: logger.info("√ 负载正在下降,可以进行调整") if load_info['load_per_core'] < 0.5: logger.info(f"√ 低负载({load_info['load_per_core']:.2f}<0.5)")
if adjust_executors(target): logger.info("~ 等待配置生效...") time.sleep(2) if adjust_executors(original_executors): logger.info(f"√ 已恢复并发数到 {original_executors}") else: logger.error("× 恢复并发数失败!") else: logger.error("× 调整并发数失败!") else: logger.warning("! 负载未呈现下降趋势,拒绝调整") else: logger.info(f"~ 当前无任务通过编译阶段,维持并发数 {original_executors}") except KeyboardInterrupt: logger.error("\n× 监控已手动终止") except Exception as e: logger.error(f"× 发生未知错误: {e}", exc_info=True) time.sleep(10)
result = get_load_psutil() logger.info(""" [系统负载情况] 1分钟负载: %.2f 5分钟负载: %.2f 15分钟负载: %.2f CPU核心数: %d 单核负载率: %.2f 是否负载下降: %s """, result['1min'], result['5min'], result['15min'], result['cpu_cores'], result['load_per_core'], '是' if result['is_decreasing'] else '否')
|