Akemi

Python subprocess模块案例—执行系统命令与创建子进程

2024/11/03

可以直接将命令传入系统命令

run模块——执行系统命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#coding=utf-8
import subprocess

# 使用列表存放传入的命令
# capture_output——捕获输出信息
# text——编码显示
result=subprocess.run(['ls','-l'],capture_output=True,text=True)

# stdout——标准输出
print(result.stdout)

# returncode返回码
result=subprocess.run('df -Th',shell=True)
print(result.stdout,result.returncode)

# 也可以使用input进行标准输入的传入
# timeout超时时间——使用测试网络、运行命令错误

try:
result=subprocess.run('sleep 5', shell=True,timeout=3)
print(result.stdout)
# 用except捕获错误信息
except subprocess.TimeoutExpired:
print("命令执行超时")

# check检查——执行失败自动抛出异常
try:
result=subprocess.run('ls /qweqweq',shell=True,check=True)
except subprocess.CalledProcessError as e:
print(f"报错{e}")

Pepen方法——创建与管理子进程

相当于将run放到子进程中运行,并且可以传入stdin,读取stdout stderr

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#coding=utf-8
import subprocess

process=subprocess.Popen(['ls','/qwe'],stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE,text=True)
# stdin向子进程发送数据
stdout,stderr=process.communicate()
print(f"标准输出:{stdout}")
print(f"错误信息:{stderr}")

# wait方法——等待运行完成,并且返回rc
process=subprocess.Popen(['sleep','2'])
exit_code=process.wait()
print(exit_code)
# 返回0

关闭子进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#coding=utf-8
import time
import subprocess

# terminate() 优雅关闭子进程
# kill() 强制终止子进程
process=subprocess.Popen(['sleep','15'])

time.sleep(5)
print(f"子进程PID:{process.pid}")

process.terminate()
print("终止信号发送")

time.sleep(1)

# 检查进程状态
status=process.poll()
if status is None:
print("进程仍然运行,准备强制终止")
process.kill()
print("子进程已强制终止")

案例:监控磁盘使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import subprocess

def check_disk_usage(threshold=80,top_n=5):
try:
result=subprocess.run(['df','-h'],text=True,capture_output=True,check=True)
output=result.stdout
# 磁盘使用情况
print("磁盘使用情况:\n",output)
# 检查磁盘使用率
lines=output.splitlines()
for line in lines[1:]:
# 将每行进行切割,返回一个列表
columes=line.split()
# 将列表索引4(利用率)提取出来,并且去掉末尾的%
percent_userd=int(columes[4].rstrip('%'))
# 对比利用率与前面设定的阈值
if percent_userd > threshold:
print(f"磁盘使用率超过阈值:{line}")
# 获取目录最大文件
result=subprocess.run(['du','-ahx','/'],capture_output=True,text=True,check=True)
output=result.stdout
lines=output.splitlines()
dir_usage={}
for line in lines:
# 只切割一次,分别赋给size和path
size,path=line.split(maxsplit=1)
dir_usage[path]=size
# 找出使用磁盘最多的目录(size最大的path)
sorted_dir=sorted(dir_usage.items(),key=lambda x:x[1],reverse=True)
print("占用空间最大的目录:")
for path,size in sorted_dir[:top_n]:
print(f'{path}:{size}')
except subprocess.CalledProcessError as e:
print(f"命令执行失败:{e}")
except Exception as e:
print(f"发生异常:{e}")

check_disk_usage()
CATALOG
  1. 1. 案例:监控磁盘使用