1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
| import requests import time import psutil import logging from logging.handlers import RotatingFileHandler import os from xml.etree import ElementTree as ET from urllib.parse import urlparse
JENKINS_URL = "xxxx" NODE_NAME = "slave-1" ADMIN = "xxxx" API_TOKEN = "xxxx" KEYWORD = "100%" LOG_PATH = "xxxx"
log_dir = os.path.dirname(LOG_PATH)
try: os.makedirs(log_dir, exist_ok=True) except PermissionError as e: raise SystemExit(f"✖ 无权限创建日志目录: {log_dir}\n错误详情: {e}")
logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG)
file_handler = RotatingFileHandler( filename=LOG_PATH, maxBytes=5*1024*1024, backupCount=5, encoding='utf-8' ) file_handler.setFormatter(logging.Formatter( '[%(asctime)s] [%(levelname).4s] %(message)s' ))
console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter( '[%(levelname)s] %(message)s' ))
logger.handlers.clear() logger.addHandler(file_handler) logger.addHandler(console_handler)
session = requests.Session() session.auth = (ADMIN, API_TOKEN) crumb_header = None original_executors = 1
def init_jenkins(): """初始化 Jenkins 连接""" global crumb_header, original_executors crumb_header = get_crumb_header() print(f"√ 初始化完成 | 原始并发数: {original_executors}")
def get_crumb_header(): """获取Jenkins CSRF防护令牌""" crumb_url = f"{JENKINS_URL}/crumbIssuer/api/json" try: response = session.get(crumb_url) data = response.json() return {data['crumbRequestField']: data['crumb']} except Exception as e: print(f"× 获取 CSRF 失败: {e}") return None
def get_current_executors(): """获取当前并发执行器配置""" config_url = f"{JENKINS_URL}/computer/{NODE_NAME}/config.xml" try: response = session.get(config_url, headers=crumb_header) root = ET.fromstring(response.content) return int(root.find(".//numExecutors").text) except Exception as e: print(f"× 获取并发数失败: {e}") return 1
def get_active_builds(): """获取当前运行中的构建任务列表,返回任务url""" api_url = f"{JENKINS_URL}/computer/{NODE_NAME}/api/json" params = {'tree': 'executors[currentExecutable[url]]'} try: response = session.get(api_url, headers=crumb_header, params=params) return [ parse_build_info(executor['currentExecutable']['url']) for executor in response.json().get('executors', []) if executor.get('currentExecutable', {}).get('url') ] except Exception as e: print(f"× 获取任务失败: {e}") return []
def parse_build_info(url): """解析构建任务URL,返回一个字典""" path = urlparse(url).path.split('/') return { 'job': path[2], 'build_num': path[3], 'url': url }
def check_keyword_in_log(build_info): """检查构建日志是否包含关键标识""" log_url = f"{JENKINS_URL}/job/{build_info['job']}/{build_info['build_num']}/logText/progressiveText" try: response = session.get(log_url, headers=crumb_header) return KEYWORD in response.text except Exception as e: print(f"× 检查日志失败: {build_info['job']} #{build_info['build_num']} - {e}") return False
def adjust_executors(target_num): """调整节点并发执行器数量""" config_url = f"{JENKINS_URL}/computer/{NODE_NAME}/config.xml" try: response = session.get(config_url, headers=crumb_header) root = ET.fromstring(response.content) num_exec = root.find(".//numExecutors") num_exec.text = str(target_num) response = session.post( config_url, headers={'Content-Type': 'text/xml'}, data=ET.tostring(root), auth=session.auth ) return response.status_code == 200 except Exception as e: print(f"× 调整并发数失败: {e}") return False
def get_load_psutil(): load = psutil.getloadavg() cpu_count = psutil.cpu_count()
is_decreasing = (load[0] < load[1] < load[2])
return { '1min': load[0], '5min': load[1], '15min': load[2], 'cpu_cores': cpu_count, 'load_per_core': load[0] / cpu_count, 'is_decreasing': is_decreasing }
if __name__ == "__main__": try: init_jenkins() builds = get_active_builds() passed = sum(1 for build in builds if check_keyword_in_log(build)) if passed > 0: target = passed + 1 logger.info(f"√ 检测到 {passed} 个任务通过编译,尝试调整并发数到 {target}") load_info = get_load_psutil() if load_info['is_decreasing'] or load_info['load_per_core'] < 0.5: if load_info['is_decreasing']: logger.info("√ 负载正在下降,可以进行调整") if load_info['load_per_core'] < 0.5: logger.info(f"√ 低负载({load_info['load_per_core']:.2f}<0.5)")
if adjust_executors(target): logger.info("~ 等待配置生效...") time.sleep(2) if adjust_executors(original_executors): logger.info(f"√ 已恢复并发数到 {original_executors}") else: logger.error("× 恢复并发数失败!") else: logger.error("× 调整并发数失败!") else: logger.warning("! 负载未呈现下降趋势,拒绝调整") else: logger.info(f"~ 当前无任务通过编译阶段,维持并发数 {original_executors}") except KeyboardInterrupt: logger.error("\n× 监控已手动终止") except Exception as e: logger.error(f"× 发生未知错误: {e}", exc_info=True) time.sleep(10)
result = get_load_psutil() logger.info(""" [系统负载情况] 1分钟负载: %.2f 5分钟负载: %.2f 15分钟负载: %.2f CPU核心数: %d 单核负载率: %.2f 是否负载下降: %s """, result['1min'], result['5min'], result['15min'], result['cpu_cores'], result['load_per_core'], '是' if result['is_decreasing'] else '否')
|