Akemi

Python自动化调整Jenkins从节点并发数

2025/04/15

纯纯小众宝藏需求

场景

1.jenkins有多个slave节点,节点可进入
2.只需要跑一个任务,这个任务会占用大量CPU资源来编译
3.节点性能有限,一次只能编译一个

需求

1.根据任务是否已经通过了编译阶段,来动态调整节点并发数
2.如果N个任务都过了编译阶段,则将并发数调整为N+1,随后调整为1
3.自动化完成,使用shell或python
4.检测服务器负载,负载降低才允许新增并发数

思路

1.通过jenkins的节点api/json获取当前任务数
2.获取任务ID对应的日志,搜索是否具备编译完成的关键字“100%”
3.使用ET修改config.xml,POST来修改并发数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import requests
import time
import psutil
import logging
from logging.handlers import RotatingFileHandler
import os
from xml.etree import ElementTree as ET # 解析与修改XML配置
from urllib.parse import urlparse

# -------------------- 配置区域 --------------------
JENKINS_URL = "xxxx"
NODE_NAME = "slave-1"
ADMIN = "xxxx"
API_TOKEN = "xxxx"
KEYWORD = "100%" # 查询日志的关键字
LOG_PATH = "xxxx" # 日志文件完整路径

# -------------------- 日志配置 --------------------
log_dir = os.path.dirname(LOG_PATH)

# 创建日志目录
try:
os.makedirs(log_dir, exist_ok=True)
except PermissionError as e:
raise SystemExit(f"✖ 无权限创建日志目录: {log_dir}\n错误详情: {e}")

# 配置日志系统
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

# 文件日志处理器(自动轮转,大小5MB)
file_handler = RotatingFileHandler(
filename=LOG_PATH,
maxBytes=5*1024*1024,
backupCount=5,
encoding='utf-8'
)
file_handler.setFormatter(logging.Formatter(
'[%(asctime)s] [%(levelname).4s] %(message)s'
))

# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(
'[%(levelname)s] %(message)s'
))

# 移除旧处理器后添加新处理器
logger.handlers.clear()
logger.addHandler(file_handler)
logger.addHandler(console_handler)
# -------------------------------------------------

# --------------- 全局状态存储 ---------------
session = requests.Session() # 保持会话的request对象
session.auth = (ADMIN, API_TOKEN)
crumb_header = None
original_executors = 1 # 原始并发执行器数量

def init_jenkins():
"""初始化 Jenkins 连接"""
global crumb_header, original_executors
crumb_header = get_crumb_header() # 获取CSRF令牌
# original_executors = get_current_executors() # 获取初始并发数
print(f"√ 初始化完成 | 原始并发数: {original_executors}")

def get_crumb_header():
"""获取Jenkins CSRF防护令牌"""
crumb_url = f"{JENKINS_URL}/crumbIssuer/api/json"
try:
response = session.get(crumb_url)
data = response.json()
return {data['crumbRequestField']: data['crumb']}
except Exception as e:
print(f"× 获取 CSRF 失败: {e}")
return None

def get_current_executors():
"""获取当前并发执行器配置"""
config_url = f"{JENKINS_URL}/computer/{NODE_NAME}/config.xml"
try:
response = session.get(config_url, headers=crumb_header)
root = ET.fromstring(response.content)
# 从 XML 配置中提取并发数配置
return int(root.find(".//numExecutors").text)
except Exception as e:
print(f"× 获取并发数失败: {e}")
return 1

def get_active_builds():
"""获取当前运行中的构建任务列表,返回任务url"""
api_url = f"{JENKINS_URL}/computer/{NODE_NAME}/api/json"
params = {'tree': 'executors[currentExecutable[url]]'}

try:
response = session.get(api_url, headers=crumb_header, params=params)
return [
parse_build_info(executor['currentExecutable']['url'])
for executor in response.json().get('executors', [])
if executor.get('currentExecutable', {}).get('url')
]
except Exception as e:
print(f"× 获取任务失败: {e}")
return []

def parse_build_info(url):
"""解析构建任务URL,返回一个字典"""
path = urlparse(url).path.split('/')
return {
'job': path[2],
'build_num': path[3],
'url': url
}

def check_keyword_in_log(build_info):
"""检查构建日志是否包含关键标识"""
log_url = f"{JENKINS_URL}/job/{build_info['job']}/{build_info['build_num']}/logText/progressiveText"
try:
response = session.get(log_url, headers=crumb_header)
return KEYWORD in response.text
except Exception as e:
print(f"× 检查日志失败: {build_info['job']} #{build_info['build_num']} - {e}")
return False

def adjust_executors(target_num):
"""调整节点并发执行器数量"""
config_url = f"{JENKINS_URL}/computer/{NODE_NAME}/config.xml"

try:
# 获取当前节点配置
response = session.get(config_url, headers=crumb_header)
root = ET.fromstring(response.content)

# 定位并发数配置节点
num_exec = root.find(".//numExecutors")
num_exec.text = str(target_num)

# 提交修改后的配置
response = session.post(
config_url,
headers={'Content-Type': 'text/xml'},
data=ET.tostring(root),
auth=session.auth
)
return response.status_code == 200
except Exception as e:
print(f"× 调整并发数失败: {e}")
return False

def get_load_psutil():
# 获取系统负载和CPU核心数
load = psutil.getloadavg()
cpu_count = psutil.cpu_count()

# 判断负载趋势:1min < 5min < 15min
is_decreasing = (load[0] < load[1] < load[2])

return {
'1min': load[0],
'5min': load[1],
'15min': load[2],
'cpu_cores': cpu_count,
'load_per_core': load[0] / cpu_count,
'is_decreasing': is_decreasing
}

if __name__ == "__main__":
try:
init_jenkins() # 初始化

# 获取当前运行任务
builds = get_active_builds()
# 统计通过关键阶段的构建数量
passed = sum(1 for build in builds if check_keyword_in_log(build))

if passed > 0:
# 动态调整策略:当前通过数 + 1
target = passed + 1
logger.info(f"√ 检测到 {passed} 个任务通过编译,尝试调整并发数到 {target}")
# 获取系统负载信息
load_info = get_load_psutil()
if load_info['is_decreasing'] or load_info['load_per_core'] < 0.5:
if load_info['is_decreasing']:
logger.info("√ 负载正在下降,可以进行调整")
if load_info['load_per_core'] < 0.5:
logger.info(f"√ 低负载({load_info['load_per_core']:.2f}<0.5)")

# 提升并发数
if adjust_executors(target):
logger.info("~ 等待配置生效...")
time.sleep(2)

# 恢复原始配置
if adjust_executors(original_executors):
logger.info(f"√ 已恢复并发数到 {original_executors}")
else:
logger.error("× 恢复并发数失败!")
else:
logger.error("× 调整并发数失败!")
else:
logger.warning("! 负载未呈现下降趋势,拒绝调整")
else:
logger.info(f"~ 当前无任务通过编译阶段,维持并发数 {original_executors}")

except KeyboardInterrupt:
logger.error("\n× 监控已手动终止")
except Exception as e:
logger.error(f"× 发生未知错误: {e}", exc_info=True)
time.sleep(10)

# 测试用——结构化日志记录
result = get_load_psutil()
logger.info("""
[系统负载情况]
1分钟负载: %.2f
5分钟负载: %.2f
15分钟负载: %.2f
CPU核心数: %d
单核负载率: %.2f
是否负载下降: %s
""",
result['1min'], result['5min'], result['15min'],
result['cpu_cores'], result['load_per_core'],
'是' if result['is_decreasing'] else '否')

CATALOG