1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| @staticmethod @app.get("/api/read_base_sw_switch") async def read_base_type(): try: result = subprocess.run(['xxxx'],timeout=10, capture_output=True,text=True,check=True) if result.stdout == "Test": return {"code": 0,"data": "test","msg": "success"} elif result.stdout == "Fleet": return {"code": 0,"data": "fleet","msg": "success"} else: return {"code": 1,"data": "","msg": "error not get the execpted value"}
except subprocess.TimeoutExpired: raise HTTPException(504, "Command timed out") except subprocess.CalledProcessError as e: raise HTTPException(400, f"Command failed: {e.stderr.decode()}") except Exception as e: raise HTTPException(500, f"Internal error: {str(e)}")
@staticmethod @app.post("/api/set_base_sw_switch") async def switch_base_type(base_type: dict): try: type_value = base_type['type'] if type_value == "test": result = subprocess.run([xxxx], timeout=10,capture_output=True,check=True) elif type_value == "fleet": result = subprocess.run([xxxx], timeout=10,capture_output=True,check=True) else: return {"code": 1, "data": "","msg": "Invalid 'type' value: must be 'test' or 'fleet'"} return JSONResponse( status_code=200, content={ "code": 0, "data": "", "msg": "successful set"})
except subprocess.TimeoutExpired: raise HTTPException(504, "Command timed out") except subprocess.CalledProcessError as e: error_detail = e.stderr.decode().strip() or "No error details" raise HTTPException(500, f"Command failed: {error_detail}") except Exception as e: raise HTTPException(500, f"Internal error: {str(e)}")
|