#!/usr/bin/env python3 """ ClearPilot bench onroad simulator. Publishes fake messages to make the UI go onroad and display configurable vehicle state. Control values via params in /dev/shm/params: BenchSpeed - vehicle speed in mph (default: 0) BenchSpeedLimit - speed limit in mph (default: 0, 0=hidden) BenchCruiseSpeed - cruise set speed in mph (default: 0, 0=not set) BenchCruiseActive - 0=disabled, 1=active, 2=paused/standstill (default: 0) BenchGear - P, D, R, N (default: P) BenchEngaged - 0 or 1, cruise engaged (default: 0) Usage: su - comma -c "PYTHONPATH=/data/openpilot python3 -m selfdrive.clearpilot.bench_onroad" To stop: Ctrl+C or kill the process. UI returns to offroad. """ import time import cereal.messaging as messaging from cereal import log, car from openpilot.common.params import Params from openpilot.common.conversions import Conversions as CV from openpilot.selfdrive.clearpilot.speed_logic import SpeedState def main(): params = Params() params_mem = Params("/dev/shm/params") # Set defaults params_mem.put("BenchSpeed", "0") params_mem.put("BenchSpeedLimit", "0") params_mem.put("BenchCruiseSpeed", "0") params_mem.put("BenchCruiseActive", "0") params_mem.put("BenchGear", "P") params_mem.put("BenchEngaged", "0") # ClearPilot speed processing speed_state = SpeedState() # thermald handles deviceState (reads our fake pandaStates for ignition) pm = messaging.PubMaster([ "pandaStates", "carState", "controlsState", "driverMonitoringState", "liveCalibration", ]) print("Bench onroad simulator started") print("Set values in /dev/shm/params/d/Bench*") print(" BenchSpeed=0 BenchSpeedLimit=0 BenchCruiseSpeed=0 BenchGear=P BenchEngaged=0") gear_map = { "P": car.CarState.GearShifter.park, "D": car.CarState.GearShifter.drive, "R": car.CarState.GearShifter.reverse, "N": car.CarState.GearShifter.neutral, } frame = 0 try: while True: # Read bench params speed_mph = float((params_mem.get("BenchSpeed", encoding="utf-8") or "0").strip()) speed_limit_mph = float((params_mem.get("BenchSpeedLimit", encoding="utf-8") or "0").strip()) cruise_mph = float((params_mem.get("BenchCruiseSpeed", encoding="utf-8") or "0").strip()) gear_str = (params_mem.get("BenchGear", encoding="utf-8") or "P").strip().upper() cruise_active_str = (params_mem.get("BenchCruiseActive", encoding="utf-8") or "0").strip() engaged = (params_mem.get("BenchEngaged", encoding="utf-8") or "0").strip() == "1" speed_ms = speed_mph * CV.MPH_TO_MS speed_limit_ms = speed_limit_mph * CV.MPH_TO_MS cruise_ms = cruise_mph * CV.MPH_TO_MS gear = gear_map.get(gear_str, car.CarState.GearShifter.park) # Cruise state: 0=disabled, 1=active, 2=paused cruise_active = cruise_active_str == "1" cruise_standstill = cruise_active_str == "2" # ClearPilot speed processing (~2 Hz at 10 Hz loop) if frame % 5 == 0: has_speed = speed_mph > 0 speed_state.update(speed_ms, has_speed, speed_limit_ms, is_metric=False, cruise_speed_ms=cruise_ms, cruise_active=cruise_active or cruise_standstill, cruise_standstill=cruise_standstill) # pandaStates — 10 Hz (thermald reads ignition from this) if frame % 1 == 0: dat = messaging.new_message("pandaStates", 1) dat.pandaStates[0].ignitionLine = True dat.pandaStates[0].pandaType = log.PandaState.PandaType.tres pm.send("pandaStates", dat) # carState — 10 Hz dat = messaging.new_message("carState") cs = dat.carState cs.vEgo = speed_ms cs.vEgoCluster = speed_ms cs.gearShifter = gear cs.standstill = speed_ms < 0.1 cs.cruiseState.available = True cs.cruiseState.enabled = engaged cs.cruiseState.speed = cruise_mph * CV.MPH_TO_MS if cruise_mph > 0 else 0 pm.send("carState", dat) # controlsState — 10 Hz dat = messaging.new_message("controlsState") ctl = dat.controlsState ctl.vCruise = cruise_mph * 1.60934 if cruise_mph > 0 else 255 # km/h or 255=not set ctl.vCruiseCluster = ctl.vCruise ctl.enabled = engaged ctl.active = engaged pm.send("controlsState", dat) # driverMonitoringState — low freq if frame % 10 == 0: dat = messaging.new_message("driverMonitoringState") dat.driverMonitoringState.isActiveMode = True pm.send("driverMonitoringState", dat) # liveCalibration — low freq if frame % 50 == 0: dat = messaging.new_message("liveCalibration") dat.liveCalibration.calStatus = log.LiveCalibrationData.Status.calibrated dat.liveCalibration.rpyCalib = [0.0, 0.0, 0.0] pm.send("liveCalibration", dat) frame += 1 time.sleep(0.1) # 10 Hz base loop except KeyboardInterrupt: print("\nBench simulator stopped") if __name__ == "__main__": main()