Plays a ding via soundd when the cruise warning sign becomes visible (cruise set speed out of range vs speed limit) or when the speed limit changes while the warning sign is already showing. Max 1 ding per 30s. Ding is mixed independently into soundd output at max volume without interrupting alert sounds. bench_cmd ding available for manual trigger. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
148 lines
4.3 KiB
Python
148 lines
4.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
ClearPilot bench command tool. Sets bench params and queries UI state.
|
|
|
|
Usage:
|
|
python3 -m selfdrive.clearpilot.bench_cmd gear D
|
|
python3 -m selfdrive.clearpilot.bench_cmd speed 20
|
|
python3 -m selfdrive.clearpilot.bench_cmd speedlimit 45
|
|
python3 -m selfdrive.clearpilot.bench_cmd cruise 55
|
|
python3 -m selfdrive.clearpilot.bench_cmd cruiseactive 0|1|2 (0=disabled, 1=active, 2=paused)
|
|
python3 -m selfdrive.clearpilot.bench_cmd engaged 1
|
|
python3 -m selfdrive.clearpilot.bench_cmd ding (trigger speed limit ding sound)
|
|
python3 -m selfdrive.clearpilot.bench_cmd debugbutton (simulate LKAS debug button press)
|
|
python3 -m selfdrive.clearpilot.bench_cmd dump
|
|
python3 -m selfdrive.clearpilot.bench_cmd wait_ready
|
|
"""
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import zmq
|
|
from openpilot.common.params import Params
|
|
|
|
|
|
def check_ui_process():
|
|
"""Check if UI process is running and healthy. Returns error string or None if OK."""
|
|
try:
|
|
result = subprocess.run(["pgrep", "-a", "-f", "./ui"], capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
return "ERROR: UI process not running"
|
|
# Get the PID and check its uptime
|
|
for line in result.stdout.strip().split("\n"):
|
|
parts = line.split(None, 1)
|
|
if len(parts) >= 2 and "./ui" in parts[1]:
|
|
pid = parts[0]
|
|
try:
|
|
stat = os.stat(f"/proc/{pid}")
|
|
uptime = time.time() - stat.st_mtime
|
|
if uptime < 5:
|
|
return f"ERROR: UI process (pid {pid}) uptime {uptime:.1f}s — likely crash looping. Check: tail /data/log2/$(ls -t /data/log2/ | head -1)/session.log"
|
|
except FileNotFoundError:
|
|
return "ERROR: UI process disappeared"
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def ui_dump():
|
|
ctx = zmq.Context()
|
|
sock = ctx.socket(zmq.REQ)
|
|
sock.setsockopt(zmq.RCVTIMEO, 2000)
|
|
sock.connect("ipc:///tmp/clearpilot_ui_rpc")
|
|
sock.send_string("dump")
|
|
try:
|
|
return sock.recv_string()
|
|
except zmq.Again:
|
|
return None
|
|
finally:
|
|
sock.close()
|
|
ctx.term()
|
|
|
|
|
|
def wait_ready(timeout=20):
|
|
"""Wait until the UI shows ReadyWindow, up to timeout seconds."""
|
|
start = time.time()
|
|
while time.time() - start < timeout:
|
|
dump = ui_dump()
|
|
if dump and "ReadyWindow" in dump:
|
|
# Check it's actually visible
|
|
for line in dump.split("\n"):
|
|
if "ReadyWindow" in line and "vis=Y" in line:
|
|
print("UI ready (ReadyWindow visible)")
|
|
return True
|
|
time.sleep(1)
|
|
print(f"ERROR: UI not ready after {timeout}s")
|
|
if dump:
|
|
print(dump)
|
|
return False
|
|
|
|
|
|
def main():
|
|
if len(sys.argv) < 2:
|
|
print(__doc__)
|
|
return
|
|
|
|
cmd = sys.argv[1].lower()
|
|
params = Params("/dev/shm/params")
|
|
|
|
param_map = {
|
|
"gear": "BenchGear",
|
|
"speed": "BenchSpeed",
|
|
"speedlimit": "BenchSpeedLimit",
|
|
"cruise": "BenchCruiseSpeed",
|
|
"cruiseactive": "BenchCruiseActive",
|
|
"engaged": "BenchEngaged",
|
|
}
|
|
|
|
if cmd == "dump":
|
|
ui_status = check_ui_process()
|
|
if ui_status:
|
|
print(ui_status)
|
|
else:
|
|
result = ui_dump()
|
|
if result:
|
|
print(result)
|
|
else:
|
|
print("ERROR: UI not responding")
|
|
|
|
elif cmd == "wait_ready":
|
|
wait_ready()
|
|
|
|
elif cmd == "ding":
|
|
params.put("ClearpilotPlayDing", "1")
|
|
print("Ding triggered")
|
|
|
|
elif cmd == "debugbutton":
|
|
# Simulate LKAS debug button — same state machine as controlsd.clearpilot_state_control()
|
|
current = params.get_int("ScreenDisplayMode")
|
|
gear = (params.get("BenchGear") or b"P").decode().strip().upper()
|
|
in_drive = gear in ("D", "S", "L")
|
|
|
|
if in_drive:
|
|
transitions = {0: 4, 1: 2, 2: 3, 3: 4, 4: 2}
|
|
new_mode = transitions.get(current, 0)
|
|
else:
|
|
new_mode = 0 if current == 3 else 3
|
|
|
|
params.put_int("ScreenDisplayMode", new_mode)
|
|
mode_names = {0: "auto-normal", 1: "auto-nightrider", 2: "normal", 3: "screen-off", 4: "nightrider"}
|
|
print(f"ScreenDisplayMode: {current} ({mode_names.get(current, '?')}) → {new_mode} ({mode_names.get(new_mode, '?')})"
|
|
f" [gear={gear}, in_drive={in_drive}]")
|
|
|
|
elif cmd in param_map:
|
|
if len(sys.argv) < 3:
|
|
print(f"Usage: bench_cmd {cmd} <value>")
|
|
return
|
|
value = sys.argv[2]
|
|
params.put(param_map[cmd], value)
|
|
print(f"{param_map[cmd]} = {value}")
|
|
|
|
else:
|
|
print(f"Unknown command: {cmd}")
|
|
print(__doc__)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|