Files
clearpilot/selfdrive/modeld/modeld.py
Brian Hanson cf2b3fc637
Some checks failed
prebuilt / build prebuilt (push) Has been cancelled
badges / create badges (push) Has been cancelled
modeld: revert to constant 20fps (except standby at standstill)
Disabled the variable-rate (4/10/20fps) logic — it caused persistent
downstream freq/valid cascades that broke paramsd's angle-offset
learner, calibrationd, and locationd's one-shot filterInitialized latch.
Symptom: user's "TAKE CONTROL IMMEDIATELY / Communication Issue"
banner during engaged driving, plus subtle right-pull (paramsd's fast
angle-offset adaptation was frozen all session).

Simpler model now:
  standstill → standby (0fps, paused)
  otherwise  → 20fps (no variable rate)

Republish-caching and FPCC.latRequested/noLatLaneChange plumbing left in
place so re-enabling variable rate later is a one-line change
(full_rate = True → the original full_rate expression).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-17 18:21:46 -05:00

421 lines
19 KiB
Python
Executable File

#!/usr/bin/env python3
import os
import time
import pickle
import numpy as np
import cereal.messaging as messaging
from cereal import car, log
from pathlib import Path
from setproctitle import setproctitle
from cereal.messaging import PubMaster, SubMaster
from cereal.visionipc import VisionIpcClient, VisionStreamType, VisionBuf
from openpilot.common.swaglog import cloudlog
from openpilot.common.params import Params
from openpilot.common.filter_simple import FirstOrderFilter
from openpilot.common.realtime import config_realtime_process
from openpilot.common.transformations.camera import DEVICE_CAMERAS
from openpilot.common.transformations.model import get_warp_matrix
from openpilot.selfdrive import sentry
from openpilot.selfdrive.car.car_helpers import get_demo_car_params
from openpilot.selfdrive.controls.lib.desire_helper import DesireHelper
from openpilot.selfdrive.modeld.runners import ModelRunner, Runtime
from openpilot.selfdrive.modeld.parse_model_outputs import Parser
from openpilot.selfdrive.modeld.fill_model_msg import fill_model_msg, fill_pose_msg, PublishState
from openpilot.selfdrive.modeld.constants import ModelConstants
from openpilot.selfdrive.modeld.models.commonmodel_pyx import ModelFrame, CLContext
from openpilot.selfdrive.frogpilot.controls.lib.model_manager import DEFAULT_MODEL, MODELS_PATH, NAVIGATIONLESS_MODELS, RADARLESS_MODELS
PROCESS_NAME = "selfdrive.modeld.modeld"
SEND_RAW_PRED = os.getenv('SEND_RAW_PRED')
MODEL_NAME = Params().get("Model", encoding='utf-8')
DISABLE_NAV = MODEL_NAME in NAVIGATIONLESS_MODELS
DISABLE_RADAR = MODEL_NAME in RADARLESS_MODELS
MODEL_PATHS = {
ModelRunner.THNEED: Path(__file__).parent / ('models/supercombo.thneed' if MODEL_NAME == DEFAULT_MODEL else f'{MODELS_PATH}/{MODEL_NAME}.thneed'),
ModelRunner.ONNX: Path(__file__).parent / 'models/supercombo.onnx'}
METADATA_PATH = Path(__file__).parent / 'models/supercombo_metadata.pkl'
class FrameMeta:
frame_id: int = 0
timestamp_sof: int = 0
timestamp_eof: int = 0
def __init__(self, vipc=None):
if vipc is not None:
self.frame_id, self.timestamp_sof, self.timestamp_eof = vipc.frame_id, vipc.timestamp_sof, vipc.timestamp_eof
class ModelState:
frame: ModelFrame
wide_frame: ModelFrame
inputs: dict[str, np.ndarray]
output: np.ndarray
prev_desire: np.ndarray # for tracking the rising edge of the pulse
model: ModelRunner
def __init__(self, context: CLContext):
self.frame = ModelFrame(context)
self.wide_frame = ModelFrame(context)
self.prev_desire = np.zeros(ModelConstants.DESIRE_LEN, dtype=np.float32)
self.inputs = {
'desire': np.zeros(ModelConstants.DESIRE_LEN * (ModelConstants.HISTORY_BUFFER_LEN+1), dtype=np.float32),
'traffic_convention': np.zeros(ModelConstants.TRAFFIC_CONVENTION_LEN, dtype=np.float32),
'lateral_control_params': np.zeros(ModelConstants.LATERAL_CONTROL_PARAMS_LEN, dtype=np.float32),
'prev_desired_curv': np.zeros(ModelConstants.PREV_DESIRED_CURV_LEN * (ModelConstants.HISTORY_BUFFER_LEN+1), dtype=np.float32),
**({'nav_features': np.zeros(ModelConstants.NAV_FEATURE_LEN, dtype=np.float32),
'nav_instructions': np.zeros(ModelConstants.NAV_INSTRUCTION_LEN, dtype=np.float32)} if not DISABLE_NAV else {}),
'features_buffer': np.zeros(ModelConstants.HISTORY_BUFFER_LEN * ModelConstants.FEATURE_LEN, dtype=np.float32),
**({'radar_tracks': np.zeros(ModelConstants.RADAR_TRACKS_LEN * ModelConstants.RADAR_TRACKS_WIDTH, dtype=np.float32)} if DISABLE_RADAR else {}),
}
with open(METADATA_PATH, 'rb') as f:
model_metadata = pickle.load(f)
self.output_slices = model_metadata['output_slices']
net_output_size = model_metadata['output_shapes']['outputs'][1]
self.output = np.zeros(net_output_size, dtype=np.float32)
self.parser = Parser()
self.model = ModelRunner(MODEL_PATHS, self.output, Runtime.GPU, False, context)
self.model.addInput("input_imgs", None)
self.model.addInput("big_input_imgs", None)
for k,v in self.inputs.items():
self.model.addInput(k, v)
def slice_outputs(self, model_outputs: np.ndarray) -> dict[str, np.ndarray]:
parsed_model_outputs = {k: model_outputs[np.newaxis, v] for k,v in self.output_slices.items()}
if SEND_RAW_PRED:
parsed_model_outputs['raw_pred'] = model_outputs.copy()
return parsed_model_outputs
def run(self, buf: VisionBuf, wbuf: VisionBuf, transform: np.ndarray, transform_wide: np.ndarray,
inputs: dict[str, np.ndarray], prepare_only: bool) -> dict[str, np.ndarray] | None:
# Model decides when action is completed, so desire input is just a pulse triggered on rising edge
inputs['desire'][0] = 0
self.inputs['desire'][:-ModelConstants.DESIRE_LEN] = self.inputs['desire'][ModelConstants.DESIRE_LEN:]
self.inputs['desire'][-ModelConstants.DESIRE_LEN:] = np.where(inputs['desire'] - self.prev_desire > .99, inputs['desire'], 0)
self.prev_desire[:] = inputs['desire']
self.inputs['traffic_convention'][:] = inputs['traffic_convention']
self.inputs['lateral_control_params'][:] = inputs['lateral_control_params']
if not DISABLE_NAV:
self.inputs['nav_features'][:] = inputs['nav_features']
self.inputs['nav_instructions'][:] = inputs['nav_instructions']
if DISABLE_RADAR:
self.inputs['radar_tracks'][:] = inputs['radar_tracks']
# if getCLBuffer is not None, frame will be None
self.model.setInputBuffer("input_imgs", self.frame.prepare(buf, transform.flatten(), self.model.getCLBuffer("input_imgs")))
if wbuf is not None:
self.model.setInputBuffer("big_input_imgs", self.wide_frame.prepare(wbuf, transform_wide.flatten(), self.model.getCLBuffer("big_input_imgs")))
if prepare_only:
return None
self.model.execute()
outputs = self.parser.parse_outputs(self.slice_outputs(self.output))
self.inputs['features_buffer'][:-ModelConstants.FEATURE_LEN] = self.inputs['features_buffer'][ModelConstants.FEATURE_LEN:]
self.inputs['features_buffer'][-ModelConstants.FEATURE_LEN:] = outputs['hidden_state'][0, :]
self.inputs['prev_desired_curv'][:-ModelConstants.PREV_DESIRED_CURV_LEN] = self.inputs['prev_desired_curv'][ModelConstants.PREV_DESIRED_CURV_LEN:]
self.inputs['prev_desired_curv'][-ModelConstants.PREV_DESIRED_CURV_LEN:] = outputs['desired_curvature'][0, :]
return outputs
def main(demo=False):
cloudlog.warning("modeld init")
sentry.set_tag("daemon", PROCESS_NAME)
cloudlog.bind(daemon=PROCESS_NAME)
setproctitle(PROCESS_NAME)
config_realtime_process(7, 54)
import time as _time
cloudlog.warning("setting up CL context")
_t0 = _time.monotonic()
cl_context = CLContext()
_t1 = _time.monotonic()
cloudlog.warning("CL context ready in %.3fs; loading model", _t1 - _t0)
model = ModelState(cl_context)
_t2 = _time.monotonic()
cloudlog.warning("model loaded in %.3fs (total init %.3fs), modeld starting", _t2 - _t1, _t2 - _t0)
# visionipc clients
while True:
available_streams = VisionIpcClient.available_streams("camerad", block=False)
if available_streams:
use_extra_client = VisionStreamType.VISION_STREAM_WIDE_ROAD in available_streams and VisionStreamType.VISION_STREAM_ROAD in available_streams
main_wide_camera = VisionStreamType.VISION_STREAM_ROAD not in available_streams
break
time.sleep(.1)
vipc_client_main_stream = VisionStreamType.VISION_STREAM_WIDE_ROAD if main_wide_camera else VisionStreamType.VISION_STREAM_ROAD
vipc_client_main = VisionIpcClient("camerad", vipc_client_main_stream, True, cl_context)
vipc_client_extra = VisionIpcClient("camerad", VisionStreamType.VISION_STREAM_WIDE_ROAD, False, cl_context)
cloudlog.warning(f"vision stream set up, main_wide_camera: {main_wide_camera}, use_extra_client: {use_extra_client}")
while not vipc_client_main.connect(False):
time.sleep(0.1)
while use_extra_client and not vipc_client_extra.connect(False):
time.sleep(0.1)
cloudlog.warning(f"connected main cam with buffer size: {vipc_client_main.buffer_len} ({vipc_client_main.width} x {vipc_client_main.height})")
if use_extra_client:
cloudlog.warning(f"connected extra cam with buffer size: {vipc_client_extra.buffer_len} ({vipc_client_extra.width} x {vipc_client_extra.height})")
# messaging
pm = PubMaster(["modelV2", "cameraOdometry"])
sm = SubMaster(["deviceState", "carState", "roadCameraState", "liveCalibration", "driverMonitoringState", "navModel", "navInstruction", "carControl", "frogpilotCarControl", "liveTracks", "frogpilotPlan"])
publish_state = PublishState()
params = Params()
# setup filter to track dropped frames
frame_dropped_filter = FirstOrderFilter(0., 10., 1. / ModelConstants.MODEL_FREQ)
frame_id = 0
last_vipc_frame_id = 0
run_count = 0
model_transform_main = np.zeros((3, 3), dtype=np.float32)
model_transform_extra = np.zeros((3, 3), dtype=np.float32)
live_calib_seen = False
model_standby = False
last_standby_ts_write = 0
params_memory = Params("/dev/shm/params")
# CLEARPILOT: cache last model output for republishing on skip cycles. Keeps downstream
# cameraOdometry/modelV2 rate constant at 20Hz so services.py freq checks never fail
# during reduced-rate mode. Content is stale but we only reduce rate when not engaged,
# so no one is actually using it for control.
last_model_output = None
nav_features = np.zeros(ModelConstants.NAV_FEATURE_LEN, dtype=np.float32)
nav_instructions = np.zeros(ModelConstants.NAV_INSTRUCTION_LEN, dtype=np.float32)
buf_main, buf_extra = None, None
meta_main = FrameMeta()
meta_extra = FrameMeta()
if demo:
CP = get_demo_car_params()
else:
with car.CarParams.from_bytes(params.get("CarParams", block=True)) as msg:
CP = msg
cloudlog.info("modeld got CarParams: %s", CP.carName)
# TODO this needs more thought, use .2s extra for now to estimate other delays
steer_delay = CP.steerActuatorDelay + .2
DH = DesireHelper()
while True:
# Keep receiving frames until we are at least 1 frame ahead of previous extra frame
while meta_main.timestamp_sof < meta_extra.timestamp_sof + 25000000:
buf_main = vipc_client_main.recv()
meta_main = FrameMeta(vipc_client_main)
if buf_main is None:
break
if buf_main is None:
cloudlog.error("vipc_client_main no frame")
continue
if use_extra_client:
# Keep receiving extra frames until frame id matches main camera
while True:
buf_extra = vipc_client_extra.recv()
meta_extra = FrameMeta(vipc_client_extra)
if buf_extra is None or meta_main.timestamp_sof < meta_extra.timestamp_sof + 25000000:
break
if buf_extra is None:
cloudlog.error("vipc_client_extra no frame")
continue
if abs(meta_main.timestamp_sof - meta_extra.timestamp_sof) > 10000000:
cloudlog.error("frames out of sync! main: {} ({:.5f}), extra: {} ({:.5f})".format(
meta_main.frame_id, meta_main.timestamp_sof / 1e9,
meta_extra.frame_id, meta_extra.timestamp_sof / 1e9))
else:
# Use single camera
buf_extra = buf_main
meta_extra = meta_main
sm.update(0)
# CLEARPILOT: reverted to two modes only — constant 20fps while moving, standby at
# standstill. Variable-rate (4/10fps) caused downstream freq/valid cascades that
# broke paramsd's angleOffset learner, calibrationd, and locationd's filter init.
# Keep the signals plumbed through frogpilotCarControl for when we re-enable.
fpcc = sm['frogpilotCarControl']
lat_active = fpcc.latRequested # noqa: F841
lane_changing = fpcc.noLatLaneChange # noqa: F841
standstill = sm['carState'].standstill
calibrating = sm['liveCalibration'].calStatus != log.LiveCalibrationData.Status.calibrated # noqa: F841
full_rate = True # always 20fps when not standstill
# Standby transitions: standstill → 0fps (parked), otherwise → 20fps
should_standby = standstill
if should_standby and not model_standby:
params_memory.put_bool("ModelStandby", True)
model_standby = True
cloudlog.warning("modeld: standby ON (standstill)")
elif not should_standby and model_standby:
params_memory.put_bool("ModelStandby", False)
model_standby = False
run_count = 0
frame_dropped_filter.x = 0.
cloudlog.warning("modeld: standby OFF")
if model_standby:
now = _time.monotonic()
if now - last_standby_ts_write > 1.0:
params_memory.put("ModelStandbyTs", str(now))
last_standby_ts_write = now
last_vipc_frame_id = meta_main.frame_id
continue
# CLEARPILOT: reduced framerate: skip GPU inference on most frames but still
# republish cached output at full 20Hz so downstream services never see a rate
# drop (avoids freq_ok → valid cascade that causes "Communication Issue" false
# positives on engage). Daylight: skip 1/2 (compute at 10fps), night: skip 4/5
# (compute at 4fps). ModelStandbyTs still written for model_suppress window.
republish_only = False
if not full_rate:
is_daylight = params_memory.get_bool("IsDaylight")
skip_interval = 2 if is_daylight else 5
target_fps = b"10" if is_daylight else b"4"
if params_memory.get("ModelFps") != target_fps:
params_memory.put("ModelFps", target_fps.decode())
now = _time.monotonic()
if now - last_standby_ts_write > 1.0:
params_memory.put("ModelStandbyTs", str(now))
last_standby_ts_write = now
if run_count % skip_interval != 0:
republish_only = True
else:
if params_memory.get("ModelFps") != b"20":
params_memory.put("ModelFps", "20")
desire = DH.desire
is_rhd = sm["driverMonitoringState"].isRHD
frame_id = sm["roadCameraState"].frameId
lateral_control_params = np.array([sm["carState"].vEgo, steer_delay], dtype=np.float32)
if sm.updated["liveCalibration"] and sm.seen['roadCameraState'] and sm.seen['deviceState']:
device_from_calib_euler = np.array(sm["liveCalibration"].rpyCalib, dtype=np.float32)
dc = DEVICE_CAMERAS[(str(sm['deviceState'].deviceType), str(sm['roadCameraState'].sensor))]
model_transform_main = get_warp_matrix(device_from_calib_euler, dc.ecam.intrinsics if main_wide_camera else dc.fcam.intrinsics, False).astype(np.float32)
model_transform_extra = get_warp_matrix(device_from_calib_euler, dc.ecam.intrinsics, True).astype(np.float32)
live_calib_seen = True
traffic_convention = np.zeros(2)
traffic_convention[int(is_rhd)] = 1
vec_desire = np.zeros(ModelConstants.DESIRE_LEN, dtype=np.float32)
if desire >= 0 and desire < ModelConstants.DESIRE_LEN:
vec_desire[desire] = 1
# Enable/disable nav features
timestamp_llk = sm["navModel"].locationMonoTime
nav_valid = sm.valid["navModel"] # and (nanos_since_boot() - timestamp_llk < 1e9)
nav_enabled = nav_valid and not DISABLE_NAV
if not nav_enabled:
nav_features[:] = 0
nav_instructions[:] = 0
if nav_enabled and sm.updated["navModel"]:
nav_features = np.array(sm["navModel"].features)
if nav_enabled and sm.updated["navInstruction"]:
nav_instructions[:] = 0
for maneuver in sm["navInstruction"].allManeuvers:
distance_idx = 25 + int(maneuver.distance / 20)
direction_idx = 0
if maneuver.modifier in ("left", "slight left", "sharp left"):
direction_idx = 1
if maneuver.modifier in ("right", "slight right", "sharp right"):
direction_idx = 2
if 0 <= distance_idx < 50:
nav_instructions[distance_idx*3 + direction_idx] = 1
radar_tracks = np.zeros(ModelConstants.RADAR_TRACKS_LEN * ModelConstants.RADAR_TRACKS_WIDTH, dtype=np.float32)
if sm.updated["liveTracks"]:
for i, track in enumerate(sm["liveTracks"]):
if i >= ModelConstants.RADAR_TRACKS_LEN:
break
vec_index = i * ModelConstants.RADAR_TRACKS_WIDTH
radar_tracks[vec_index:vec_index+ModelConstants.RADAR_TRACKS_WIDTH] = [track.dRel, track.yRel, track.vRel]
# tracked dropped frames
vipc_dropped_frames = max(0, meta_main.frame_id - last_vipc_frame_id - 1)
frames_dropped = frame_dropped_filter.update(min(vipc_dropped_frames, 10))
if run_count < 10: # let frame drops warm up
frame_dropped_filter.x = 0.
frames_dropped = 0.
run_count = run_count + 1
frame_drop_ratio = frames_dropped / (1 + frames_dropped)
prepare_only = vipc_dropped_frames > 0
if prepare_only:
cloudlog.error(f"skipping model eval. Dropped {vipc_dropped_frames} frames")
inputs:dict[str, np.ndarray] = {
'desire': vec_desire,
'traffic_convention': traffic_convention,
'lateral_control_params': lateral_control_params,
**({'nav_features': nav_features, 'nav_instructions': nav_instructions} if not DISABLE_NAV else {}),
**({'radar_tracks': radar_tracks,} if DISABLE_RADAR else {}),
}
# CLEARPILOT: on republish cycles, skip the GPU inference and reuse the last
# model output. Publish rate stays at 20Hz, compute rate is reduced.
if republish_only and last_model_output is not None:
model_output = last_model_output
model_execution_time = 0.0
else:
mt1 = time.perf_counter()
model_output = model.run(buf_main, buf_extra, model_transform_main, model_transform_extra, inputs, prepare_only)
mt2 = time.perf_counter()
model_execution_time = mt2 - mt1
if model_output is not None:
# cache for next republish cycle
last_model_output = model_output
modelv2_send = messaging.new_message('modelV2')
posenet_send = messaging.new_message('cameraOdometry')
fill_model_msg(modelv2_send, model_output, publish_state, meta_main.frame_id, meta_extra.frame_id, frame_id, frame_drop_ratio,
meta_main.timestamp_eof, timestamp_llk, model_execution_time, nav_enabled, live_calib_seen)
desire_state = modelv2_send.modelV2.meta.desireState
l_lane_change_prob = desire_state[log.Desire.laneChangeLeft]
r_lane_change_prob = desire_state[log.Desire.laneChangeRight]
lane_change_prob = l_lane_change_prob + r_lane_change_prob
DH.update(sm['carState'], sm['carControl'].latActive, lane_change_prob, sm['frogpilotPlan'])
modelv2_send.modelV2.meta.laneChangeState = DH.lane_change_state
modelv2_send.modelV2.meta.laneChangeDirection = DH.lane_change_direction
modelv2_send.modelV2.meta.turnDirection = DH.turn_direction
fill_pose_msg(posenet_send, model_output, meta_main.frame_id, vipc_dropped_frames, meta_main.timestamp_eof, live_calib_seen)
pm.send('modelV2', modelv2_send)
pm.send('cameraOdometry', posenet_send)
last_vipc_frame_id = meta_main.frame_id
if __name__ == "__main__":
try:
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--demo', action='store_true', help='A boolean for demo mode.')
args = parser.parse_args()
main(demo=args.demo)
except KeyboardInterrupt:
cloudlog.warning(f"child {PROCESS_NAME} got SIGINT")
except Exception:
sentry.capture_exception()
raise