Files
clearpilot/selfdrive/modeld/dmonitoringmodeld.py
T
brianhansonxyz 47321e3867 restore driving logic to pre-variable-fps baseline
Wholesale revert of driving-relevant files to the snapshot in
/projects/openpilot/archive/clearpilot (HEAD 980f0aa). Goal: get
known-good driving behavior back, then re-introduce optimizations
slowly to track down a "feels like the wheel pulls right" regression.

Files restored from baseline:
- selfdrive/controls/controlsd.py
- selfdrive/controls/lib/events.py
- selfdrive/controls/lib/longitudinal_planner.py
- selfdrive/modeld/modeld.py
- selfdrive/modeld/dmonitoringmodeld.py
- selfdrive/locationd/calibrationd.py
- selfdrive/locationd/paramsd.py
- selfdrive/locationd/torqued.py
- selfdrive/car/interfaces.py
- selfdrive/car/hyundai/carstate.py (CAN-FD telemetry preserved as a
  commented block for future re-enable)
- selfdrive/monitoring/dmonitoringd.py
- selfdrive/frogpilot/controls/frogpilot_planner.py
- common/realtime.py

Intentionally NOT restored (kept as current):
- selfdrive/thermald/* (fan/power tuning kept)
- selfdrive/car/hyundai/carcontroller.py + hyundaicanfd.py (perf-only
  hoist of no_lat_lane_change Params read; behavior-equivalent)
- cereal/services.py, cereal/custom.capnp (additive only)
- selfdrive/manager/*, common/params.cc (heavy ClearPilot
  infrastructure: bench mode, log dir, dashcamd, gpsd, params)
- All selfdrive/ui/, selfdrive/clearpilot/, system/clearpilot/

UI features will be re-wired in a follow-up commit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-26 08:40:02 -05:00

157 lines
5.8 KiB
Python
Executable File

#!/usr/bin/env python3
import os
import gc
import math
import time
import ctypes
import numpy as np
from pathlib import Path
from cereal import messaging
from cereal.messaging import PubMaster, SubMaster
from cereal.visionipc import VisionIpcClient, VisionStreamType, VisionBuf
from openpilot.common.swaglog import cloudlog
from openpilot.common.params import Params
from openpilot.common.realtime import set_realtime_priority
from openpilot.selfdrive.modeld.runners import ModelRunner, Runtime
from openpilot.selfdrive.modeld.models.commonmodel_pyx import sigmoid
CALIB_LEN = 3
REG_SCALE = 0.25
MODEL_WIDTH = 1440
MODEL_HEIGHT = 960
OUTPUT_SIZE = 84
SEND_RAW_PRED = os.getenv('SEND_RAW_PRED')
MODEL_PATHS = {
ModelRunner.SNPE: Path(__file__).parent / 'models/dmonitoring_model_q.dlc',
ModelRunner.ONNX: Path(__file__).parent / 'models/dmonitoring_model.onnx'}
class DriverStateResult(ctypes.Structure):
_fields_ = [
("face_orientation", ctypes.c_float*3),
("face_position", ctypes.c_float*3),
("face_orientation_std", ctypes.c_float*3),
("face_position_std", ctypes.c_float*3),
("face_prob", ctypes.c_float),
("_unused_a", ctypes.c_float*8),
("left_eye_prob", ctypes.c_float),
("_unused_b", ctypes.c_float*8),
("right_eye_prob", ctypes.c_float),
("left_blink_prob", ctypes.c_float),
("right_blink_prob", ctypes.c_float),
("sunglasses_prob", ctypes.c_float),
("occluded_prob", ctypes.c_float),
("ready_prob", ctypes.c_float*4),
("not_ready_prob", ctypes.c_float*2)]
class DMonitoringModelResult(ctypes.Structure):
_fields_ = [
("driver_state_lhd", DriverStateResult),
("driver_state_rhd", DriverStateResult),
("poor_vision_prob", ctypes.c_float),
("wheel_on_right_prob", ctypes.c_float)]
class ModelState:
inputs: dict[str, np.ndarray]
output: np.ndarray
model: ModelRunner
def __init__(self):
assert ctypes.sizeof(DMonitoringModelResult) == OUTPUT_SIZE * ctypes.sizeof(ctypes.c_float)
self.output = np.zeros(OUTPUT_SIZE, dtype=np.float32)
self.inputs = {
'input_img': np.zeros(MODEL_HEIGHT * MODEL_WIDTH, dtype=np.uint8),
'calib': np.zeros(CALIB_LEN, dtype=np.float32)}
self.model = ModelRunner(MODEL_PATHS, self.output, Runtime.DSP, True, None)
self.model.addInput("input_img", None)
self.model.addInput("calib", self.inputs['calib'])
def run(self, buf:VisionBuf, calib:np.ndarray) -> tuple[np.ndarray, float]:
self.inputs['calib'][:] = calib
v_offset = buf.height - MODEL_HEIGHT
h_offset = (buf.width - MODEL_WIDTH) // 2
buf_data = buf.data.reshape(-1, buf.stride)
input_data = self.inputs['input_img'].reshape(MODEL_HEIGHT, MODEL_WIDTH)
input_data[:] = buf_data[v_offset:v_offset+MODEL_HEIGHT, h_offset:h_offset+MODEL_WIDTH]
t1 = time.perf_counter()
self.model.setInputBuffer("input_img", self.inputs['input_img'].view(np.float32))
self.model.execute()
t2 = time.perf_counter()
return self.output, t2 - t1
def fill_driver_state(msg, ds_result: DriverStateResult):
msg.faceOrientation = [x * REG_SCALE for x in ds_result.face_orientation]
msg.faceOrientationStd = [math.exp(x) for x in ds_result.face_orientation_std]
msg.facePosition = [x * REG_SCALE for x in ds_result.face_position[:2]]
msg.facePositionStd = [math.exp(x) for x in ds_result.face_position_std[:2]]
msg.faceProb = sigmoid(ds_result.face_prob)
msg.leftEyeProb = sigmoid(ds_result.left_eye_prob)
msg.rightEyeProb = sigmoid(ds_result.right_eye_prob)
msg.leftBlinkProb = sigmoid(ds_result.left_blink_prob)
msg.rightBlinkProb = sigmoid(ds_result.right_blink_prob)
msg.sunglassesProb = sigmoid(ds_result.sunglasses_prob)
msg.occludedProb = sigmoid(ds_result.occluded_prob)
msg.readyProb = [sigmoid(x) for x in ds_result.ready_prob]
msg.notReadyProb = [sigmoid(x) for x in ds_result.not_ready_prob]
def get_driverstate_packet(model_output: np.ndarray, frame_id: int, location_ts: int, execution_time: float, dsp_execution_time: float):
model_result = ctypes.cast(model_output.ctypes.data, ctypes.POINTER(DMonitoringModelResult)).contents
msg = messaging.new_message('driverStateV2', valid=True)
ds = msg.driverStateV2
ds.frameId = frame_id
ds.modelExecutionTime = execution_time
ds.dspExecutionTime = dsp_execution_time
ds.poorVisionProb = sigmoid(model_result.poor_vision_prob)
ds.wheelOnRightProb = sigmoid(model_result.wheel_on_right_prob)
ds.rawPredictions = model_output.tobytes() if SEND_RAW_PRED else b''
fill_driver_state(ds.leftDriverData, model_result.driver_state_lhd)
fill_driver_state(ds.rightDriverData, model_result.driver_state_rhd)
return msg
def main():
gc.disable()
set_realtime_priority(1)
model = ModelState()
cloudlog.warning("models loaded, dmonitoringmodeld starting")
Params().put_bool("DmModelInitialized", True)
cloudlog.warning("connecting to driver stream")
vipc_client = VisionIpcClient("camerad", VisionStreamType.VISION_STREAM_DRIVER, True)
while not vipc_client.connect(False):
time.sleep(0.1)
assert vipc_client.is_connected()
cloudlog.warning(f"connected with buffer size: {vipc_client.buffer_len}")
sm = SubMaster(["liveCalibration"])
pm = PubMaster(["driverStateV2"])
calib = np.zeros(CALIB_LEN, dtype=np.float32)
# last = 0
while True:
buf = vipc_client.recv()
if buf is None:
continue
sm.update(0)
if sm.updated["liveCalibration"]:
calib[:] = np.array(sm["liveCalibration"].rpyCalib)
t1 = time.perf_counter()
model_output, dsp_execution_time = model.run(buf, calib)
t2 = time.perf_counter()
pm.send("driverStateV2", get_driverstate_packet(model_output, vipc_client.frame_id, vipc_client.timestamp_sof, t2 - t1, dsp_execution_time))
# print("dmonitoring process: %.2fms, from last %.2fms\n" % (t2 - t1, t1 - last))
# last = t1
if __name__ == "__main__":
main()