clearpilot: initial commit of full source

This commit is contained in:
2026-04-11 06:25:25 +00:00
commit e2a0c1894a
3383 changed files with 834683 additions and 0 deletions

View File

View File

@@ -0,0 +1,61 @@
import copy
import os
import json
from collections import defaultdict
from dataclasses import dataclass
from openpilot.common.basedir import BASEDIR
from openpilot.common.params import Params
from openpilot.selfdrive.controls.lib.events import Alert
with open(os.path.join(BASEDIR, "selfdrive/controls/lib/alerts_offroad.json")) as f:
OFFROAD_ALERTS = json.load(f)
def set_offroad_alert(alert: str, show_alert: bool, extra_text: str = None) -> None:
if show_alert:
a = copy.copy(OFFROAD_ALERTS[alert])
a['extra'] = extra_text or ''
Params().put(alert, json.dumps(a))
else:
Params().remove(alert)
@dataclass
class AlertEntry:
alert: Alert | None = None
start_frame: int = -1
end_frame: int = -1
def active(self, frame: int) -> bool:
return frame <= self.end_frame
class AlertManager:
def __init__(self):
self.alerts: dict[str, AlertEntry] = defaultdict(AlertEntry)
def add_many(self, frame: int, alerts: list[Alert]) -> None:
for alert in alerts:
entry = self.alerts[alert.alert_type]
entry.alert = alert
if not entry.active(frame):
entry.start_frame = frame
min_end_frame = entry.start_frame + alert.duration
entry.end_frame = max(frame + 1, min_end_frame)
def process_alerts(self, frame: int, clear_event_types: set) -> Alert | None:
current_alert = AlertEntry()
for v in self.alerts.values():
if not v.alert:
continue
if v.alert.event_type in clear_event_types:
v.end_frame = -1
# sort by priority first and then by start_frame
greater = current_alert.alert is None or (v.alert.priority, v.start_frame) > (current_alert.alert.priority, current_alert.start_frame)
if v.active(frame) and greater:
current_alert = v
return current_alert.alert

View File

@@ -0,0 +1,56 @@
{
"Offroad_TemperatureTooHigh": {
"text": "Device temperature too high. System cooling down before starting. Current internal component temperature: %1",
"severity": 1
},
"Offroad_ConnectivityNeededPrompt": {
"text": "Immediately connect to the internet to check for updates. If you do not connect to the internet, openpilot won't engage in %1",
"severity": 0,
"_comment": "Set extra field to number of days"
},
"Offroad_ConnectivityNeeded": {
"text": "Connect to internet to check for updates. openpilot won't automatically start until it connects to internet to check for updates.",
"severity": 1
},
"Offroad_UpdateFailed": {
"text": "Unable to download updates\n%1",
"severity": 1,
"_comment": "Set extra field to the failed reason."
},
"Offroad_InvalidTime": {
"text": "Invalid date and time settings, system won't start. Connect to internet to set time.",
"severity": 1
},
"Offroad_IsTakingSnapshot": {
"text": "Taking camera snapshots. System won't start until finished.",
"severity": 0
},
"Offroad_NeosUpdate": {
"text": "An update to your device's operating system is downloading in the background. You will be prompted to update when it's ready to install.",
"severity": 0
},
"Offroad_UnofficialHardware": {
"text": "Device failed to register. It will not connect to or upload to comma.ai servers, and receives no support from comma.ai. If this is an official device, visit https://comma.ai/support.",
"severity": 1
},
"Offroad_StorageMissing": {
"text": "NVMe drive not mounted.",
"severity": 1
},
"Offroad_BadNvme": {
"text": "Unsupported NVMe drive detected. Device may draw significantly more power and overheat due to the unsupported NVMe.",
"severity": 1
},
"Offroad_CarUnrecognized": {
"text": "openpilot was unable to identify your car. Your car is either unsupported or its ECUs are not recognized. Please submit a pull request to add the firmware versions to the proper vehicle. Need help? Join discord.comma.ai.",
"severity": 0
},
"Offroad_NoFirmware": {
"text": "openpilot was unable to identify your car. Check integrity of cables and ensure all connections are secure, particularly that the comma power is fully inserted in the OBD-II port of the vehicle. Need help? Join discord.comma.ai.",
"severity": 0
},
"Offroad_Recalibration": {
"text": "openpilot detected a change in the device's mounting position. Ensure the device is fully seated in the mount and the mount is firmly secured to the windshield.",
"severity": 0
}
}

View File

@@ -0,0 +1,190 @@
from cereal import log
from openpilot.common.conversions import Conversions as CV
from openpilot.common.params import Params
from openpilot.common.realtime import DT_MDL
LaneChangeState = log.LaneChangeState
LaneChangeDirection = log.LaneChangeDirection
TurnDirection = log.Desire
LANE_CHANGE_SPEED_MIN = 20 * CV.MPH_TO_MS
LANE_CHANGE_TIME_MAX = 10.
DESIRES = {
LaneChangeDirection.none: {
LaneChangeState.off: log.Desire.none,
LaneChangeState.preLaneChange: log.Desire.none,
LaneChangeState.laneChangeStarting: log.Desire.none,
LaneChangeState.laneChangeFinishing: log.Desire.none,
},
LaneChangeDirection.left: {
LaneChangeState.off: log.Desire.none,
LaneChangeState.preLaneChange: log.Desire.none,
LaneChangeState.laneChangeStarting: log.Desire.laneChangeLeft,
LaneChangeState.laneChangeFinishing: log.Desire.laneChangeLeft,
},
LaneChangeDirection.right: {
LaneChangeState.off: log.Desire.none,
LaneChangeState.preLaneChange: log.Desire.none,
LaneChangeState.laneChangeStarting: log.Desire.laneChangeRight,
LaneChangeState.laneChangeFinishing: log.Desire.laneChangeRight,
},
}
TURN_DESIRES = {
TurnDirection.none: log.Desire.none,
TurnDirection.turnLeft: log.Desire.turnLeft,
TurnDirection.turnRight: log.Desire.turnRight,
}
class DesireHelper:
def __init__(self):
self.lane_change_state = LaneChangeState.off
self.lane_change_direction = LaneChangeDirection.none
self.lane_change_timer = 0.0
self.lane_change_ll_prob = 1.0
self.keep_pulse_timer = 0.0
self.prev_one_blinker = False
self.desire = log.Desire.none
# FrogPilot variables
self.params = Params()
self.params_memory = Params("/dev/shm/params")
self.turn_direction = TurnDirection.none
self.lane_change_completed = False
self.turn_completed = False
self.lane_change_wait_timer = 0
self.update_frogpilot_params()
def update(self, carstate, lateral_active, lane_change_prob, frogpilotPlan):
# Clearpilot test lane change no lat
clearpilot_disable_lat_on_lane_change = True
# Todo, we want to count how many lanes we have moved during this process either left or right.
# Then, if blinker left, but we have traveled rightwards, reenable lateral and vice versa.
# Also, if we are less than 25% twoards our lane change and we get a blind spot, reenable lateral
# with influence back to our lane on wheel.
v_ego = carstate.vEgo
one_blinker = carstate.leftBlinker != carstate.rightBlinker
below_lane_change_speed = v_ego < LANE_CHANGE_SPEED_MIN
if not (self.lane_detection and one_blinker) or below_lane_change_speed:
lane_available = True
else:
desired_lane = frogpilotPlan.laneWidthLeft if carstate.leftBlinker else frogpilotPlan.laneWidthRight
lane_available = desired_lane >= self.lane_detection_width
if (not clearpilot_disable_lat_on_lane_change and not lateral_active) or self.lane_change_timer > LANE_CHANGE_TIME_MAX:
self.lane_change_state = LaneChangeState.off
self.lane_change_direction = LaneChangeDirection.none
self.turn_direction = TurnDirection.none
elif one_blinker and below_lane_change_speed and self.turn_desires:
self.turn_direction = TurnDirection.turnLeft if carstate.leftBlinker else TurnDirection.turnRight
# Set the "turn_completed" flag to prevent lane changes after completing a turn
self.turn_completed = True
else:
self.turn_direction = TurnDirection.none
# LaneChangeState.off
if self.lane_change_state == LaneChangeState.off and one_blinker and not self.prev_one_blinker and not below_lane_change_speed:
self.lane_change_state = LaneChangeState.preLaneChange
self.lane_change_ll_prob = 1.0
self.lane_change_wait_timer = 0
# LaneChangeState.preLaneChange
elif self.lane_change_state == LaneChangeState.preLaneChange:
# Set lane change direction
self.lane_change_direction = LaneChangeDirection.left if \
carstate.leftBlinker else LaneChangeDirection.right
torque_applied = carstate.steeringPressed and \
((carstate.steeringTorque > 0 and self.lane_change_direction == LaneChangeDirection.left) or
(carstate.steeringTorque < 0 and self.lane_change_direction == LaneChangeDirection.right))
blindspot_detected = ((carstate.leftBlindspot and self.lane_change_direction == LaneChangeDirection.left) or
(carstate.rightBlindspot and self.lane_change_direction == LaneChangeDirection.right))
self.lane_change_wait_timer += DT_MDL
if self.nudgeless and lane_available and not self.lane_change_completed and self.lane_change_wait_timer >= self.lane_change_delay:
torque_applied = True
self.lane_change_wait_timer = 0
if not one_blinker or below_lane_change_speed:
self.lane_change_state = LaneChangeState.off
self.lane_change_direction = LaneChangeDirection.none
elif torque_applied and not blindspot_detected:
self.lane_change_state = LaneChangeState.laneChangeStarting
self.lane_change_completed = True if self.one_lane_change else False
# LaneChangeState.laneChangeStarting
elif self.lane_change_state == LaneChangeState.laneChangeStarting:
if clearpilot_disable_lat_on_lane_change:
if not one_blinker:
self.lane_change_state = LaneChangeState.laneChangeFinishing
else:
# fade out over .5s
self.lane_change_ll_prob = max(self.lane_change_ll_prob - 2 * DT_MDL, 0.0)
# 98% certainty
if lane_change_prob < 0.02 and self.lane_change_ll_prob < 0.01:
self.lane_change_state = LaneChangeState.laneChangeFinishing
# LaneChangeState.laneChangeFinishing
elif self.lane_change_state == LaneChangeState.laneChangeFinishing:
# fade in laneline over 1s
self.lane_change_ll_prob = min(self.lane_change_ll_prob + DT_MDL, 1.0)
if self.lane_change_ll_prob > 0.99:
self.lane_change_direction = LaneChangeDirection.none
if one_blinker:
self.lane_change_state = LaneChangeState.preLaneChange
else:
self.lane_change_state = LaneChangeState.off
if self.lane_change_state in (LaneChangeState.off, LaneChangeState.preLaneChange):
self.lane_change_timer = 0.0
else:
self.lane_change_timer += DT_MDL
self.lane_change_completed &= one_blinker
self.prev_one_blinker = one_blinker
self.turn_completed &= one_blinker
# Clearpilot test: disabling turn desires
# if self.turn_direction != TurnDirection.none:
# self.desire = TURN_DESIRES[self.turn_direction]
# elif not self.turn_completed:
# self.desire = DESIRES[self.lane_change_direction][self.lane_change_state]
# else:
# self.desire = log.Desire.none
# Send keep pulse once per second during LaneChangeStart.preLaneChange
if self.lane_change_state in (LaneChangeState.off, LaneChangeState.laneChangeStarting):
self.keep_pulse_timer = 0.0
elif self.lane_change_state == LaneChangeState.preLaneChange:
self.keep_pulse_timer += DT_MDL
if self.keep_pulse_timer > 1.0:
self.keep_pulse_timer = 0.0
elif self.desire in (log.Desire.keepLeft, log.Desire.keepRight):
self.desire = log.Desire.none
if self.params_memory.get_bool("FrogPilotTogglesUpdated"):
self.update_frogpilot_params()
def update_frogpilot_params(self):
is_metric = self.params.get_bool("IsMetric")
lateral_tune = self.params.get_bool("LateralTune")
self.turn_desires = lateral_tune and self.params.get_bool("TurnDesires")
self.nudgeless = self.params.get_bool("NudgelessLaneChange")
self.lane_change_delay = self.params.get_int("LaneChangeTime") if self.nudgeless else 0
self.lane_detection = self.nudgeless and self.params.get_int("LaneDetectionWidth") != 0
self.lane_detection_width = self.params.get_int("LaneDetectionWidth") * (1 if is_metric else CV.FOOT_TO_METER) / 10 if self.lane_detection else 0
self.one_lane_change = self.nudgeless and self.params.get_bool("OneLaneChange")

View File

@@ -0,0 +1,214 @@
import math
from cereal import car, log
from openpilot.common.conversions import Conversions as CV
from openpilot.common.numpy_fast import clip, interp
from openpilot.common.params import Params
from openpilot.common.realtime import DT_CTRL
# WARNING: this value was determined based on the model's training distribution,
# model predictions above this speed can be unpredictable
# V_CRUISE's are in kph
V_CRUISE_MIN = 8
V_CRUISE_MAX = 145
V_CRUISE_UNSET = 255
V_CRUISE_INITIAL = 40
V_CRUISE_INITIAL_EXPERIMENTAL_MODE = 105
IMPERIAL_INCREMENT = 1.6 # should be CV.MPH_TO_KPH, but this causes rounding errors
MIN_SPEED = 1.0
CONTROL_N = 17
CAR_ROTATION_RADIUS = 0.0
# EU guidelines
MAX_LATERAL_JERK = 5.0
MAX_VEL_ERR = 5.0
ButtonEvent = car.CarState.ButtonEvent
ButtonType = car.CarState.ButtonEvent.Type
CRUISE_LONG_PRESS = 50
CRUISE_NEAREST_FUNC = {
ButtonType.accelCruise: math.ceil,
ButtonType.decelCruise: math.floor,
}
CRUISE_INTERVAL_SIGN = {
ButtonType.accelCruise: +1,
ButtonType.decelCruise: -1,
}
class VCruiseHelper:
def __init__(self, CP):
self.CP = CP
self.v_cruise_kph = V_CRUISE_UNSET
self.v_cruise_cluster_kph = V_CRUISE_UNSET
self.v_cruise_kph_last = 0
self.button_timers = {ButtonType.decelCruise: 0, ButtonType.accelCruise: 0}
self.button_change_states = {btn: {"standstill": False, "enabled": False} for btn in self.button_timers}
# FrogPilot variables
self.params_memory = Params("/dev/shm/params")
@property
def v_cruise_initialized(self):
return self.v_cruise_kph != V_CRUISE_UNSET
def update_v_cruise(self, CS, enabled, is_metric, speed_limit_changed, frogpilot_variables):
self.v_cruise_kph_last = self.v_cruise_kph
if CS.cruiseState.available:
if not self.CP.pcmCruise:
# if stock cruise is completely disabled, then we can use our own set speed logic
self._update_v_cruise_non_pcm(CS, enabled, is_metric, speed_limit_changed, frogpilot_variables)
self.v_cruise_cluster_kph = self.v_cruise_kph
self.update_button_timers(CS, enabled)
else:
self.v_cruise_kph = CS.cruiseState.speed * CV.MS_TO_KPH
self.v_cruise_cluster_kph = CS.cruiseState.speedCluster * CV.MS_TO_KPH
else:
self.v_cruise_kph = V_CRUISE_UNSET
self.v_cruise_cluster_kph = V_CRUISE_UNSET
def _update_v_cruise_non_pcm(self, CS, enabled, is_metric, speed_limit_changed, frogpilot_variables):
# handle button presses. TODO: this should be in state_control, but a decelCruise press
# would have the effect of both enabling and changing speed is checked after the state transition
if not enabled:
return
long_press = False
button_type = None
v_cruise_delta = 1. if is_metric else IMPERIAL_INCREMENT
for b in CS.buttonEvents:
if b.type.raw in self.button_timers and not b.pressed:
if self.button_timers[b.type.raw] > CRUISE_LONG_PRESS:
return # end long press
button_type = b.type.raw
break
else:
for k in self.button_timers.keys():
if self.button_timers[k] and self.button_timers[k] % CRUISE_LONG_PRESS == 0:
button_type = k
long_press = True
break
if button_type is None:
return
# Confirm or deny the new speed limit value
if speed_limit_changed:
if button_type == ButtonType.accelCruise:
self.params_memory.put_bool("SLCConfirmed", True)
self.params_memory.put_bool("SLCConfirmedPressed", True)
return
elif button_type == ButtonType.decelCruise:
self.params_memory.put_bool("SLCConfirmed", False)
self.params_memory.put_bool("SLCConfirmedPressed", True)
return
# Don't adjust speed when pressing resume to exit standstill
cruise_standstill = self.button_change_states[button_type]["standstill"] or CS.cruiseState.standstill
if button_type == ButtonType.accelCruise and cruise_standstill:
return
# Don't adjust speed if we've enabled since the button was depressed (some ports enable on rising edge)
if not self.button_change_states[button_type]["enabled"]:
return
v_cruise_delta_interval = frogpilot_variables.custom_cruise_increase_long if long_press else frogpilot_variables.custom_cruise_increase
v_cruise_delta = v_cruise_delta * v_cruise_delta_interval
if v_cruise_delta_interval % 5 == 0 and self.v_cruise_kph % v_cruise_delta != 0: # partial interval
self.v_cruise_kph = CRUISE_NEAREST_FUNC[button_type](self.v_cruise_kph / v_cruise_delta) * v_cruise_delta
else:
self.v_cruise_kph += v_cruise_delta * CRUISE_INTERVAL_SIGN[button_type]
v_cruise_offset = (frogpilot_variables.set_speed_offset * CRUISE_INTERVAL_SIGN[button_type]) if long_press else 0
if v_cruise_offset < 0:
v_cruise_offset = frogpilot_variables.set_speed_offset - v_cruise_delta
self.v_cruise_kph += v_cruise_offset
# If set is pressed while overriding, clip cruise speed to minimum of vEgo
if CS.gasPressed and button_type in (ButtonType.decelCruise, ButtonType.setCruise):
self.v_cruise_kph = max(self.v_cruise_kph, CS.vEgo * CV.MS_TO_KPH)
self.v_cruise_kph = clip(round(self.v_cruise_kph, 1), V_CRUISE_MIN, V_CRUISE_MAX)
def update_button_timers(self, CS, enabled):
# increment timer for buttons still pressed
for k in self.button_timers:
if self.button_timers[k] > 0:
self.button_timers[k] += 1
for b in CS.buttonEvents:
if b.type.raw in self.button_timers:
# Start/end timer and store current state on change of button pressed
self.button_timers[b.type.raw] = 1 if b.pressed else 0
self.button_change_states[b.type.raw] = {"standstill": CS.cruiseState.standstill, "enabled": enabled}
def initialize_v_cruise(self, CS, experimental_mode: bool, desired_speed_limit, frogpilot_variables) -> None:
# initializing is handled by the PCM
if self.CP.pcmCruise:
return
initial = V_CRUISE_INITIAL_EXPERIMENTAL_MODE if experimental_mode else V_CRUISE_INITIAL
# 250kph or above probably means we never had a set speed
if any(b.type in (ButtonType.accelCruise, ButtonType.resumeCruise) for b in CS.buttonEvents) and self.v_cruise_kph_last < 250:
self.v_cruise_kph = self.v_cruise_kph_last
else:
if desired_speed_limit != 0 and frogpilot_variables.set_speed_limit:
self.v_cruise_kph = int(round(desired_speed_limit * CV.MS_TO_KPH))
else:
self.v_cruise_kph = int(round(clip(CS.vEgo * CV.MS_TO_KPH, initial, V_CRUISE_MAX)))
self.v_cruise_cluster_kph = self.v_cruise_kph
def apply_deadzone(error, deadzone):
if error > deadzone:
error -= deadzone
elif error < - deadzone:
error += deadzone
else:
error = 0.
return error
def apply_center_deadzone(error, deadzone):
if (error > - deadzone) and (error < deadzone):
error = 0.
return error
def rate_limit(new_value, last_value, dw_step, up_step):
return clip(new_value, last_value + dw_step, last_value + up_step)
def clip_curvature(v_ego, prev_curvature, new_curvature):
v_ego = max(MIN_SPEED, v_ego)
max_curvature_rate = MAX_LATERAL_JERK / (v_ego**2) # inexact calculation, check https://github.com/commaai/openpilot/pull/24755
safe_desired_curvature = clip(new_curvature,
prev_curvature - max_curvature_rate * DT_CTRL,
prev_curvature + max_curvature_rate * DT_CTRL)
return safe_desired_curvature
def get_friction(lateral_accel_error: float, lateral_accel_deadzone: float, friction_threshold: float,
torque_params: car.CarParams.LateralTorqueTuning, friction_compensation: bool) -> float:
friction_interp = interp(
apply_center_deadzone(lateral_accel_error, lateral_accel_deadzone),
[-friction_threshold, friction_threshold],
[-torque_params.friction, torque_params.friction]
)
friction = float(friction_interp) if friction_compensation else 0.0
return friction
def get_speed_error(modelV2: log.ModelDataV2, v_ego: float) -> float:
# ToDo: Try relative error, and absolute speed
if len(modelV2.temporalPose.trans):
vel_err = clip(modelV2.temporalPose.trans[0] - v_ego, -MAX_VEL_ERR, MAX_VEL_ERR)
return float(vel_err)
return 0.0

1222
selfdrive/controls/lib/events.py Executable file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,32 @@
from abc import abstractmethod, ABC
from openpilot.common.numpy_fast import clip
from openpilot.common.realtime import DT_CTRL
MIN_LATERAL_CONTROL_SPEED = 0.3 # m/s
class LatControl(ABC):
def __init__(self, CP, CI):
self.sat_count_rate = 1.0 * DT_CTRL
self.sat_limit = CP.steerLimitTimer
self.sat_count = 0.
self.sat_check_min_speed = 10.
# we define the steer torque scale as [-1.0...1.0]
self.steer_max = 1.0
@abstractmethod
def update(self, active, CS, VM, params, steer_limited, desired_curvature, llk, model_data=None):
pass
def reset(self):
self.sat_count = 0.
def _check_saturation(self, saturated, CS, steer_limited):
if saturated and CS.vEgo > self.sat_check_min_speed and not steer_limited and not CS.steeringPressed:
self.sat_count += self.sat_count_rate
else:
self.sat_count -= self.sat_count_rate
self.sat_count = clip(self.sat_count, 0.0, self.sat_limit)
return self.sat_count > (self.sat_limit - 1e-3)

View File

@@ -0,0 +1,29 @@
import math
from cereal import log
from openpilot.selfdrive.controls.lib.latcontrol import LatControl
STEER_ANGLE_SATURATION_THRESHOLD = 2.5 # Degrees
class LatControlAngle(LatControl):
def __init__(self, CP, CI):
super().__init__(CP, CI)
self.sat_check_min_speed = 5.
def update(self, active, CS, VM, params, steer_limited, desired_curvature, llk, model_data=None):
angle_log = log.ControlsState.LateralAngleState.new_message()
if not active:
angle_log.active = False
angle_steers_des = float(CS.steeringAngleDeg)
else:
angle_log.active = True
angle_steers_des = math.degrees(VM.get_steer_from_curvature(-desired_curvature, CS.vEgo, params.roll))
angle_steers_des += params.angleOffsetDeg
angle_control_saturated = abs(angle_steers_des - CS.steeringAngleDeg) > STEER_ANGLE_SATURATION_THRESHOLD
angle_log.saturated = self._check_saturation(angle_control_saturated, CS, False)
angle_log.steeringAngleDeg = float(CS.steeringAngleDeg)
angle_log.steeringAngleDesiredDeg = angle_steers_des
return 0, float(angle_steers_des), angle_log

View File

@@ -0,0 +1,48 @@
import math
from cereal import log
from openpilot.selfdrive.controls.lib.latcontrol import LatControl
from openpilot.selfdrive.controls.lib.pid import PIDController
class LatControlPID(LatControl):
def __init__(self, CP, CI):
super().__init__(CP, CI)
self.pid = PIDController((CP.lateralTuning.pid.kpBP, CP.lateralTuning.pid.kpV),
(CP.lateralTuning.pid.kiBP, CP.lateralTuning.pid.kiV),
k_f=CP.lateralTuning.pid.kf, pos_limit=self.steer_max, neg_limit=-self.steer_max)
self.get_steer_feedforward = CI.get_steer_feedforward_function()
def reset(self):
super().reset()
self.pid.reset()
def update(self, active, CS, VM, params, steer_limited, desired_curvature, llk, model_data=None):
pid_log = log.ControlsState.LateralPIDState.new_message()
pid_log.steeringAngleDeg = float(CS.steeringAngleDeg)
pid_log.steeringRateDeg = float(CS.steeringRateDeg)
angle_steers_des_no_offset = math.degrees(VM.get_steer_from_curvature(-desired_curvature, CS.vEgo, params.roll))
angle_steers_des = angle_steers_des_no_offset + params.angleOffsetDeg
error = angle_steers_des - CS.steeringAngleDeg
pid_log.steeringAngleDesiredDeg = angle_steers_des
pid_log.angleError = error
if not active:
output_steer = 0.0
pid_log.active = False
self.pid.reset()
else:
# offset does not contribute to resistive torque
steer_feedforward = self.get_steer_feedforward(angle_steers_des_no_offset, CS.vEgo)
output_steer = self.pid.update(error, override=CS.steeringPressed,
feedforward=steer_feedforward, speed=CS.vEgo)
pid_log.active = True
pid_log.p = self.pid.p
pid_log.i = self.pid.i
pid_log.f = self.pid.f
pid_log.output = output_steer
pid_log.saturated = self._check_saturation(self.steer_max - abs(output_steer) < 1e-3, CS, steer_limited)
return output_steer, angle_steers_des, pid_log

View File

@@ -0,0 +1,259 @@
from collections import deque
import math
import numpy as np
from cereal import log
from openpilot.common.filter_simple import FirstOrderFilter
from openpilot.common.numpy_fast import interp
from openpilot.selfdrive.car.interfaces import LatControlInputs
from openpilot.selfdrive.controls.lib.drive_helpers import CONTROL_N, apply_deadzone
from openpilot.selfdrive.controls.lib.latcontrol import LatControl
from openpilot.selfdrive.controls.lib.pid import PIDController
from openpilot.selfdrive.controls.lib.vehicle_model import ACCELERATION_DUE_TO_GRAVITY
from openpilot.selfdrive.modeld.constants import ModelConstants
# At higher speeds (25+mph) we can assume:
# Lateral acceleration achieved by a specific car correlates to
# torque applied to the steering rack. It does not correlate to
# wheel slip, or to speed.
# This controller applies torque to achieve desired lateral
# accelerations. To compensate for the low speed effects we
# use a LOW_SPEED_FACTOR in the error. Additionally, there is
# friction in the steering wheel that needs to be overcome to
# move it at all, this is compensated for too.
LOW_SPEED_X = [0, 10, 20, 30]
LOW_SPEED_Y = [15, 13, 10, 5]
LOW_SPEED_Y_NN = [12, 3, 1, 0]
LAT_PLAN_MIN_IDX = 5
def get_predicted_lateral_jerk(lat_accels, t_diffs):
# compute finite difference between subsequent model_data.acceleration.y values
# this is just two calls of np.diff followed by an element-wise division
lat_accel_diffs = np.diff(lat_accels)
lat_jerk = lat_accel_diffs / t_diffs
# return as python list
return lat_jerk.tolist()
def sign(x):
return 1.0 if x > 0.0 else (-1.0 if x < 0.0 else 0.0)
def get_lookahead_value(future_vals, current_val):
if len(future_vals) == 0:
return current_val
same_sign_vals = [v for v in future_vals if sign(v) == sign(current_val)]
# if any future val has opposite sign of current val, return 0
if len(same_sign_vals) < len(future_vals):
return 0.0
# otherwise return the value with minimum absolute value
min_val = min(same_sign_vals + [current_val], key=lambda x: abs(x))
return min_val
# At a given roll, if pitch magnitude increases, the
# gravitational acceleration component starts pointing
# in the longitudinal direction, decreasing the lateral
# acceleration component. Here we do the same thing
# to the roll value itself, then passed to nnff.
def roll_pitch_adjust(roll, pitch):
return roll * math.cos(pitch)
class LatControlTorque(LatControl):
def __init__(self, CP, CI):
super().__init__(CP, CI)
self.torque_params = CP.lateralTuning.torque
self.pid = PIDController(self.torque_params.kp, self.torque_params.ki,
k_f=self.torque_params.kf, pos_limit=self.steer_max, neg_limit=-self.steer_max)
self.torque_from_lateral_accel = CI.torque_from_lateral_accel()
self.use_steering_angle = self.torque_params.useSteeringAngle
self.steering_angle_deadzone_deg = self.torque_params.steeringAngleDeadzoneDeg
# Twilsonco's Lateral Neural Network Feedforward
self.use_nnff = CI.use_nnff
self.use_nnff_lite = CI.use_nnff_lite
if self.use_nnff or self.use_nnff_lite:
# Instantaneous lateral jerk changes very rapidly, making it not useful on its own,
# however, we can "look ahead" to the future planned lateral jerk in order to guage
# whether the current desired lateral jerk will persist into the future, i.e.
# whether it's "deliberate" or not. This lets us simply ignore short-lived jerk.
# Note that LAT_PLAN_MIN_IDX is defined above and is used in order to prevent
# using a "future" value that is actually planned to occur before the "current" desired
# value, which is offset by the steerActuatorDelay.
self.friction_look_ahead_v = [1.4, 2.0] # how many seconds in the future to look ahead in [0, ~2.1] in 0.1 increments
self.friction_look_ahead_bp = [9.0, 30.0] # corresponding speeds in m/s in [0, ~40] in 1.0 increments
# Scaling the lateral acceleration "friction response" could be helpful for some.
# Increase for a stronger response, decrease for a weaker response.
self.lat_jerk_friction_factor = 0.4
self.lat_accel_friction_factor = 0.7 # in [0, 3], in 0.05 increments. 3 is arbitrary safety limit
# precompute time differences between ModelConstants.T_IDXS
self.t_diffs = np.diff(ModelConstants.T_IDXS)
self.desired_lat_jerk_time = CP.steerActuatorDelay + 0.3
if self.use_nnff:
self.pitch = FirstOrderFilter(0.0, 0.5, 0.01)
# NN model takes current v_ego, lateral_accel, lat accel/jerk error, roll, and past/future/planned data
# of lat accel and roll
# Past value is computed using previous desired lat accel and observed roll
self.torque_from_nn = CI.get_ff_nn
self.nn_friction_override = CI.lat_torque_nn_model.friction_override
# setup future time offsets
self.nn_time_offset = CP.steerActuatorDelay + 0.2
future_times = [0.3, 0.6, 1.0, 1.5] # seconds in the future
self.nn_future_times = [i + self.nn_time_offset for i in future_times]
self.nn_future_times_np = np.array(self.nn_future_times)
# setup past time offsets
self.past_times = [-0.3, -0.2, -0.1]
history_check_frames = [int(abs(i)*100) for i in self.past_times]
self.history_frame_offsets = [history_check_frames[0] - i for i in history_check_frames]
self.lateral_accel_desired_deque = deque(maxlen=history_check_frames[0])
self.roll_deque = deque(maxlen=history_check_frames[0])
self.error_deque = deque(maxlen=history_check_frames[0])
self.past_future_len = len(self.past_times) + len(self.nn_future_times)
def update_live_torque_params(self, latAccelFactor, latAccelOffset, friction):
self.torque_params.latAccelFactor = latAccelFactor
self.torque_params.latAccelOffset = latAccelOffset
self.torque_params.friction = friction
def update(self, active, CS, VM, params, steer_limited, desired_curvature, llk, model_data=None):
pid_log = log.ControlsState.LateralTorqueState.new_message()
nn_log = None
if not active:
output_torque = 0.0
pid_log.active = False
else:
actual_curvature_vm = -VM.calc_curvature(math.radians(CS.steeringAngleDeg - params.angleOffsetDeg), CS.vEgo, params.roll)
roll_compensation = params.roll * ACCELERATION_DUE_TO_GRAVITY
if self.use_steering_angle:
actual_curvature = actual_curvature_vm
curvature_deadzone = abs(VM.calc_curvature(math.radians(self.steering_angle_deadzone_deg), CS.vEgo, 0.0))
if self.use_nnff or self.use_nnff_lite:
actual_curvature_rate = -VM.calc_curvature(math.radians(CS.steeringRateDeg), CS.vEgo, 0.0)
actual_lateral_jerk = actual_curvature_rate * CS.vEgo ** 2
else:
actual_curvature_llk = llk.angularVelocityCalibrated.value[2] / CS.vEgo
actual_curvature = interp(CS.vEgo, [2.0, 5.0], [actual_curvature_vm, actual_curvature_llk])
curvature_deadzone = 0.0
actual_lateral_jerk = 0.0
desired_lateral_accel = desired_curvature * CS.vEgo ** 2
# desired rate is the desired rate of change in the setpoint, not the absolute desired curvature
# desired_lateral_jerk = desired_curvature_rate * CS.vEgo ** 2
actual_lateral_accel = actual_curvature * CS.vEgo ** 2
lateral_accel_deadzone = curvature_deadzone * CS.vEgo ** 2
low_speed_factor = interp(CS.vEgo, LOW_SPEED_X, LOW_SPEED_Y if not self.use_nnff else LOW_SPEED_Y_NN)**2
setpoint = desired_lateral_accel + low_speed_factor * desired_curvature
measurement = actual_lateral_accel + low_speed_factor * actual_curvature
lateral_jerk_setpoint = 0
lateral_jerk_measurement = 0
if self.use_nnff or self.use_nnff_lite:
# prepare "look-ahead" desired lateral jerk
lat_accel_friction_factor = self.lat_accel_friction_factor
if len(model_data.acceleration.y) == ModelConstants.IDX_N:
lookahead = interp(CS.vEgo, self.friction_look_ahead_bp, self.friction_look_ahead_v)
friction_upper_idx = next((i for i, val in enumerate(ModelConstants.T_IDXS) if val > lookahead), 16)
predicted_lateral_jerk = get_predicted_lateral_jerk(model_data.acceleration.y, self.t_diffs)
desired_lateral_jerk = (interp(self.desired_lat_jerk_time, ModelConstants.T_IDXS, model_data.acceleration.y) - actual_lateral_accel) / self.desired_lat_jerk_time
lookahead_lateral_jerk = get_lookahead_value(predicted_lateral_jerk[LAT_PLAN_MIN_IDX:friction_upper_idx], desired_lateral_jerk)
if self.use_steering_angle or lookahead_lateral_jerk == 0.0:
lookahead_lateral_jerk = 0.0
actual_lateral_jerk = 0.0
lat_accel_friction_factor = 1.0
lateral_jerk_setpoint = self.lat_jerk_friction_factor * lookahead_lateral_jerk
lateral_jerk_measurement = self.lat_jerk_friction_factor * actual_lateral_jerk
else:
lateral_jerk_setpoint = 0.0
lateral_jerk_measurement = 0.0
lookahead_lateral_jerk = 0.0
model_good = model_data is not None and len(model_data.orientation.x) >= CONTROL_N
if self.use_nnff and model_good:
# update past data
roll = params.roll
pitch = self.pitch.update(llk.calibratedOrientationNED.value[1]) if len(llk.calibratedOrientationNED.value) > 1 else 0.0
roll = roll_pitch_adjust(roll, pitch)
self.roll_deque.append(roll)
self.lateral_accel_desired_deque.append(desired_lateral_accel)
# prepare past and future values
# adjust future times to account for longitudinal acceleration
adjusted_future_times = [t + 0.5*CS.aEgo*(t/max(CS.vEgo, 1.0)) for t in self.nn_future_times]
past_rolls = [self.roll_deque[min(len(self.roll_deque)-1, i)] for i in self.history_frame_offsets]
future_rolls = [roll_pitch_adjust(interp(t, ModelConstants.T_IDXS, model_data.orientation.x) + roll, interp(t, ModelConstants.T_IDXS, model_data.orientation.y) + pitch) for t in adjusted_future_times]
past_lateral_accels_desired = [self.lateral_accel_desired_deque[min(len(self.lateral_accel_desired_deque)-1, i)] for i in self.history_frame_offsets]
future_planned_lateral_accels = [interp(t, ModelConstants.T_IDXS[:CONTROL_N], model_data.acceleration.y) for t in adjusted_future_times]
# compute NNFF error response
nnff_setpoint_input = [CS.vEgo, setpoint, lateral_jerk_setpoint, roll] \
+ [setpoint] * self.past_future_len \
+ past_rolls + future_rolls
# past lateral accel error shouldn't count, so use past desired like the setpoint input
nnff_measurement_input = [CS.vEgo, measurement, lateral_jerk_measurement, roll] \
+ [measurement] * self.past_future_len \
+ past_rolls + future_rolls
torque_from_setpoint = self.torque_from_nn(nnff_setpoint_input)
torque_from_measurement = self.torque_from_nn(nnff_measurement_input)
pid_log.error = torque_from_setpoint - torque_from_measurement
# compute feedforward (same as nn setpoint output)
error = setpoint - measurement
friction_input = lat_accel_friction_factor * error + self.lat_jerk_friction_factor * lookahead_lateral_jerk
nn_input = [CS.vEgo, desired_lateral_accel, friction_input, roll] \
+ past_lateral_accels_desired + future_planned_lateral_accels \
+ past_rolls + future_rolls
ff = self.torque_from_nn(nn_input)
# apply friction override for cars with low NN friction response
if self.nn_friction_override:
pid_log.error += self.torque_from_lateral_accel(LatControlInputs(0.0, 0.0, CS.vEgo, CS.aEgo), self.torque_params,
friction_input, lateral_accel_deadzone, friction_compensation=True, gravity_adjusted=False)
nn_log = nn_input + nnff_setpoint_input + nnff_measurement_input
else:
gravity_adjusted_lateral_accel = desired_lateral_accel - roll_compensation
torque_from_setpoint = self.torque_from_lateral_accel(LatControlInputs(setpoint, roll_compensation, CS.vEgo, CS.aEgo), self.torque_params,
lateral_jerk_setpoint, lateral_accel_deadzone, friction_compensation=self.use_nnff_lite, gravity_adjusted=False)
torque_from_measurement = self.torque_from_lateral_accel(LatControlInputs(measurement, roll_compensation, CS.vEgo, CS.aEgo), self.torque_params,
lateral_jerk_measurement, lateral_accel_deadzone, friction_compensation=self.use_nnff_lite, gravity_adjusted=False)
pid_log.error = torque_from_setpoint - torque_from_measurement
error = desired_lateral_accel - actual_lateral_accel
if self.use_nnff_lite:
friction_input = lat_accel_friction_factor * error + self.lat_jerk_friction_factor * lookahead_lateral_jerk
else:
friction_input = error
ff = self.torque_from_lateral_accel(LatControlInputs(gravity_adjusted_lateral_accel, roll_compensation, CS.vEgo, CS.aEgo), self.torque_params,
friction_input, lateral_accel_deadzone, friction_compensation=True,
gravity_adjusted=True)
freeze_integrator = steer_limited or CS.steeringPressed or CS.vEgo < 5
output_torque = self.pid.update(pid_log.error,
feedforward=ff,
speed=CS.vEgo,
freeze_integrator=freeze_integrator)
pid_log.active = True
pid_log.p = self.pid.p
pid_log.i = self.pid.i
pid_log.d = self.pid.d
pid_log.f = self.pid.f
pid_log.output = -output_torque
pid_log.actualLateralAccel = actual_lateral_accel
pid_log.desiredLateralAccel = desired_lateral_accel
pid_log.saturated = self._check_saturation(self.steer_max - abs(output_torque) < 1e-3, CS, steer_limited)
if nn_log is not None:
pid_log.nnLog = nn_log
# TODO left is positive in this convention
return -output_torque, 0.0, pid_log

View File

@@ -0,0 +1,2 @@
acados_ocp_lat.json
c_generated_code/

View File

@@ -0,0 +1,90 @@
Import('env', 'envCython', 'arch', 'messaging_python', 'common_python', 'opendbc_python')
gen = "c_generated_code"
casadi_model = [
f'{gen}/lat_model/lat_expl_ode_fun.c',
f'{gen}/lat_model/lat_expl_vde_forw.c',
]
casadi_cost_y = [
f'{gen}/lat_cost/lat_cost_y_fun.c',
f'{gen}/lat_cost/lat_cost_y_fun_jac_ut_xt.c',
f'{gen}/lat_cost/lat_cost_y_hess.c',
]
casadi_cost_e = [
f'{gen}/lat_cost/lat_cost_y_e_fun.c',
f'{gen}/lat_cost/lat_cost_y_e_fun_jac_ut_xt.c',
f'{gen}/lat_cost/lat_cost_y_e_hess.c',
]
casadi_cost_0 = [
f'{gen}/lat_cost/lat_cost_y_0_fun.c',
f'{gen}/lat_cost/lat_cost_y_0_fun_jac_ut_xt.c',
f'{gen}/lat_cost/lat_cost_y_0_hess.c',
]
build_files = [f'{gen}/acados_solver_lat.c'] + casadi_model + casadi_cost_y + casadi_cost_e + casadi_cost_0
# extra generated files used to trigger a rebuild
generated_files = [
f'{gen}/Makefile',
f'{gen}/main_lat.c',
f'{gen}/main_sim_lat.c',
f'{gen}/acados_solver_lat.h',
f'{gen}/acados_sim_solver_lat.h',
f'{gen}/acados_sim_solver_lat.c',
f'{gen}/acados_solver.pxd',
f'{gen}/lat_model/lat_expl_vde_adj.c',
f'{gen}/lat_model/lat_model.h',
f'{gen}/lat_constraints/lat_constraints.h',
f'{gen}/lat_cost/lat_cost.h',
] + build_files
acados_dir = '#third_party/acados'
acados_templates_dir = '#third_party/acados/acados_template/c_templates_tera'
source_list = ['lat_mpc.py',
'#selfdrive/modeld/constants.py',
f'{acados_dir}/include/acados_c/ocp_nlp_interface.h',
f'{acados_templates_dir}/acados_solver.in.c',
]
lenv = env.Clone()
lenv.Clean(generated_files, Dir(gen))
generated_lat = lenv.Command(generated_files,
source_list,
f"cd {Dir('.').abspath} && python3 lat_mpc.py")
lenv.Depends(generated_lat, [messaging_python, common_python, opendbc_python])
lenv["CFLAGS"].append("-DACADOS_WITH_QPOASES")
lenv["CXXFLAGS"].append("-DACADOS_WITH_QPOASES")
lenv["CCFLAGS"].append("-Wno-unused")
if arch != "Darwin":
lenv["LINKFLAGS"].append("-Wl,--disable-new-dtags")
lib_solver = lenv.SharedLibrary(f"{gen}/acados_ocp_solver_lat",
build_files,
LIBS=['m', 'acados', 'hpipm', 'blasfeo', 'qpOASES_e'])
# generate cython stuff
acados_ocp_solver_pyx = File("#third_party/acados/acados_template/acados_ocp_solver_pyx.pyx")
acados_ocp_solver_common = File("#third_party/acados/acados_template/acados_solver_common.pxd")
libacados_ocp_solver_pxd = File(f'{gen}/acados_solver.pxd')
libacados_ocp_solver_c = File(f'{gen}/acados_ocp_solver_pyx.c')
lenv2 = envCython.Clone()
lenv2["LINKFLAGS"] += [lib_solver[0].get_labspath()]
lenv2.Command(libacados_ocp_solver_c,
[acados_ocp_solver_pyx, acados_ocp_solver_common, libacados_ocp_solver_pxd],
f'cython' + \
f' -o {libacados_ocp_solver_c.get_labspath()}' + \
f' -I {libacados_ocp_solver_pxd.get_dir().get_labspath()}' + \
f' -I {acados_ocp_solver_common.get_dir().get_labspath()}' + \
f' {acados_ocp_solver_pyx.get_labspath()}')
lib_cython = lenv2.Program(f'{gen}/acados_ocp_solver_pyx.so', [libacados_ocp_solver_c])
lenv2.Depends(lib_cython, lib_solver)

View File

@@ -0,0 +1,199 @@
#!/usr/bin/env python3
import os
import time
import numpy as np
from casadi import SX, vertcat, sin, cos
# WARNING: imports outside of constants will not trigger a rebuild
from openpilot.selfdrive.modeld.constants import ModelConstants
if __name__ == '__main__': # generating code
from openpilot.third_party.acados.acados_template import AcadosModel, AcadosOcp, AcadosOcpSolver
else:
from openpilot.selfdrive.controls.lib.lateral_mpc_lib.c_generated_code.acados_ocp_solver_pyx import AcadosOcpSolverCython
LAT_MPC_DIR = os.path.dirname(os.path.abspath(__file__))
EXPORT_DIR = os.path.join(LAT_MPC_DIR, "c_generated_code")
JSON_FILE = os.path.join(LAT_MPC_DIR, "acados_ocp_lat.json")
X_DIM = 4
P_DIM = 2
COST_E_DIM = 3
COST_DIM = COST_E_DIM + 2
SPEED_OFFSET = 10.0
MODEL_NAME = 'lat'
ACADOS_SOLVER_TYPE = 'SQP_RTI'
N = 32
def gen_lat_model():
model = AcadosModel()
model.name = MODEL_NAME
# set up states & controls
x_ego = SX.sym('x_ego')
y_ego = SX.sym('y_ego')
psi_ego = SX.sym('psi_ego')
psi_rate_ego = SX.sym('psi_rate_ego')
model.x = vertcat(x_ego, y_ego, psi_ego, psi_rate_ego)
# parameters
v_ego = SX.sym('v_ego')
rotation_radius = SX.sym('rotation_radius')
model.p = vertcat(v_ego, rotation_radius)
# controls
psi_accel_ego = SX.sym('psi_accel_ego')
model.u = vertcat(psi_accel_ego)
# xdot
x_ego_dot = SX.sym('x_ego_dot')
y_ego_dot = SX.sym('y_ego_dot')
psi_ego_dot = SX.sym('psi_ego_dot')
psi_rate_ego_dot = SX.sym('psi_rate_ego_dot')
model.xdot = vertcat(x_ego_dot, y_ego_dot, psi_ego_dot, psi_rate_ego_dot)
# dynamics model
f_expl = vertcat(v_ego * cos(psi_ego) - rotation_radius * sin(psi_ego) * psi_rate_ego,
v_ego * sin(psi_ego) + rotation_radius * cos(psi_ego) * psi_rate_ego,
psi_rate_ego,
psi_accel_ego)
model.f_impl_expr = model.xdot - f_expl
model.f_expl_expr = f_expl
return model
def gen_lat_ocp():
ocp = AcadosOcp()
ocp.model = gen_lat_model()
Tf = np.array(ModelConstants.T_IDXS)[N]
# set dimensions
ocp.dims.N = N
# set cost module
ocp.cost.cost_type = 'NONLINEAR_LS'
ocp.cost.cost_type_e = 'NONLINEAR_LS'
Q = np.diag(np.zeros(COST_E_DIM))
QR = np.diag(np.zeros(COST_DIM))
ocp.cost.W = QR
ocp.cost.W_e = Q
y_ego, psi_ego, psi_rate_ego = ocp.model.x[1], ocp.model.x[2], ocp.model.x[3]
psi_rate_ego_dot = ocp.model.u[0]
v_ego = ocp.model.p[0]
ocp.parameter_values = np.zeros((P_DIM, ))
ocp.cost.yref = np.zeros((COST_DIM, ))
ocp.cost.yref_e = np.zeros((COST_E_DIM, ))
# Add offset to smooth out low speed control
# TODO unclear if this right solution long term
v_ego_offset = v_ego + SPEED_OFFSET
# TODO there are two costs on psi_rate_ego_dot, one
# is correlated to jerk the other to steering wheel movement
# the steering wheel movement cost is added to prevent excessive
# wheel movements
ocp.model.cost_y_expr = vertcat(y_ego,
v_ego_offset * psi_ego,
v_ego_offset * psi_rate_ego,
v_ego_offset * psi_rate_ego_dot,
psi_rate_ego_dot / (v_ego + 0.1))
ocp.model.cost_y_expr_e = vertcat(y_ego,
v_ego_offset * psi_ego,
v_ego_offset * psi_rate_ego)
# set constraints
ocp.constraints.constr_type = 'BGH'
ocp.constraints.idxbx = np.array([2,3])
ocp.constraints.ubx = np.array([np.radians(90), np.radians(50)])
ocp.constraints.lbx = np.array([-np.radians(90), -np.radians(50)])
x0 = np.zeros((X_DIM,))
ocp.constraints.x0 = x0
ocp.solver_options.qp_solver = 'PARTIAL_CONDENSING_HPIPM'
ocp.solver_options.hessian_approx = 'GAUSS_NEWTON'
ocp.solver_options.integrator_type = 'ERK'
ocp.solver_options.nlp_solver_type = ACADOS_SOLVER_TYPE
ocp.solver_options.qp_solver_iter_max = 1
ocp.solver_options.qp_solver_cond_N = 1
# set prediction horizon
ocp.solver_options.tf = Tf
ocp.solver_options.shooting_nodes = np.array(ModelConstants.T_IDXS)[:N+1]
ocp.code_export_directory = EXPORT_DIR
return ocp
class LateralMpc():
def __init__(self, x0=None):
if x0 is None:
x0 = np.zeros(X_DIM)
self.solver = AcadosOcpSolverCython(MODEL_NAME, ACADOS_SOLVER_TYPE, N)
self.reset(x0)
def reset(self, x0=None):
if x0 is None:
x0 = np.zeros(X_DIM)
self.x_sol = np.zeros((N+1, X_DIM))
self.u_sol = np.zeros((N, 1))
self.yref = np.zeros((N+1, COST_DIM))
for i in range(N):
self.solver.cost_set(i, "yref", self.yref[i])
self.solver.cost_set(N, "yref", self.yref[N][:COST_E_DIM])
# Somehow needed for stable init
for i in range(N+1):
self.solver.set(i, 'x', np.zeros(X_DIM))
self.solver.set(i, 'p', np.zeros(P_DIM))
self.solver.constraints_set(0, "lbx", x0)
self.solver.constraints_set(0, "ubx", x0)
self.solver.solve()
self.solution_status = 0
self.solve_time = 0.0
self.cost = 0
def set_weights(self, path_weight, heading_weight,
lat_accel_weight, lat_jerk_weight,
steering_rate_weight):
W = np.asfortranarray(np.diag([path_weight, heading_weight,
lat_accel_weight, lat_jerk_weight,
steering_rate_weight]))
for i in range(N):
self.solver.cost_set(i, 'W', W)
self.solver.cost_set(N, 'W', W[:COST_E_DIM,:COST_E_DIM])
def run(self, x0, p, y_pts, heading_pts, yaw_rate_pts):
x0_cp = np.copy(x0)
p_cp = np.copy(p)
self.solver.constraints_set(0, "lbx", x0_cp)
self.solver.constraints_set(0, "ubx", x0_cp)
self.yref[:,0] = y_pts
v_ego = p_cp[0, 0]
# rotation_radius = p_cp[1]
self.yref[:,1] = heading_pts * (v_ego + SPEED_OFFSET)
self.yref[:,2] = yaw_rate_pts * (v_ego + SPEED_OFFSET)
for i in range(N):
self.solver.cost_set(i, "yref", self.yref[i])
self.solver.set(i, "p", p_cp[i])
self.solver.set(N, "p", p_cp[N])
self.solver.cost_set(N, "yref", self.yref[N][:COST_E_DIM])
t = time.monotonic()
self.solution_status = self.solver.solve()
self.solve_time = time.monotonic() - t
for i in range(N+1):
self.x_sol[i] = self.solver.get(i, 'x')
for i in range(N):
self.u_sol[i] = self.solver.get(i, 'u')
self.cost = self.solver.get_cost()
if __name__ == "__main__":
ocp = gen_lat_ocp()
AcadosOcpSolver.generate(ocp, json_file=JSON_FILE)
# AcadosOcpSolver.build(ocp.code_export_directory, with_cython=True)

View File

@@ -0,0 +1,155 @@
from cereal import car
from openpilot.common.numpy_fast import clip, interp
from openpilot.common.params import Params
from openpilot.common.realtime import DT_CTRL
from openpilot.selfdrive.controls.lib.drive_helpers import CONTROL_N, apply_deadzone
from openpilot.selfdrive.controls.lib.pid import PIDController
from openpilot.selfdrive.modeld.constants import ModelConstants
LongCtrlState = car.CarControl.Actuators.LongControlState
def long_control_state_trans(CP, active, long_control_state, v_ego, v_target,
v_target_1sec, brake_pressed, cruise_standstill):
# Ignore cruise standstill if car has a gas interceptor
cruise_standstill = cruise_standstill and not CP.enableGasInterceptor
accelerating = v_target_1sec > v_target
planned_stop = (v_target < CP.vEgoStopping and
v_target_1sec < CP.vEgoStopping and
not accelerating)
stay_stopped = (v_ego < CP.vEgoStopping and
(brake_pressed or cruise_standstill))
stopping_condition = planned_stop or stay_stopped
starting_condition = (v_target_1sec > CP.vEgoStarting and
accelerating and
not cruise_standstill and
not brake_pressed)
started_condition = v_ego > CP.vEgoStarting
if not active:
long_control_state = LongCtrlState.off
else:
if long_control_state in (LongCtrlState.off, LongCtrlState.pid):
long_control_state = LongCtrlState.pid
if stopping_condition:
long_control_state = LongCtrlState.stopping
elif long_control_state == LongCtrlState.stopping:
if starting_condition and CP.startingState:
long_control_state = LongCtrlState.starting
elif starting_condition:
long_control_state = LongCtrlState.pid
elif long_control_state == LongCtrlState.starting:
if stopping_condition:
long_control_state = LongCtrlState.stopping
elif started_condition:
long_control_state = LongCtrlState.pid
return long_control_state
class LongControl:
def __init__(self, CP):
self.CP = CP
self.long_control_state = LongCtrlState.off # initialized to off
self.pid = PIDController((CP.longitudinalTuning.kpBP, CP.longitudinalTuning.kpV),
(CP.longitudinalTuning.kiBP, CP.longitudinalTuning.kiV),
k_f=CP.longitudinalTuning.kf, rate=1 / DT_CTRL)
self.v_pid = 0.0
self.last_output_accel = 0.0
# FrogPilot variables
self.params = Params()
self.params_memory = Params("/dev/shm/params")
self.update_frogpilot_params()
def reset(self, v_pid):
"""Reset PID controller and change setpoint"""
self.pid.reset()
self.v_pid = v_pid
def update(self, active, CS, long_plan, accel_limits, t_since_plan):
"""Update longitudinal control. This updates the state machine and runs a PID loop"""
# Interp control trajectory
speeds = long_plan.speeds
if len(speeds) == CONTROL_N:
v_target_now = interp(t_since_plan, ModelConstants.T_IDXS[:CONTROL_N], speeds)
a_target_now = interp(t_since_plan, ModelConstants.T_IDXS[:CONTROL_N], long_plan.accels)
v_target_lower = interp(self.CP.longitudinalActuatorDelayLowerBound + t_since_plan, ModelConstants.T_IDXS[:CONTROL_N], speeds)
a_target_lower = 2 * (v_target_lower - v_target_now) / self.CP.longitudinalActuatorDelayLowerBound - a_target_now
v_target_upper = interp(self.CP.longitudinalActuatorDelayUpperBound + t_since_plan, ModelConstants.T_IDXS[:CONTROL_N], speeds)
a_target_upper = 2 * (v_target_upper - v_target_now) / self.CP.longitudinalActuatorDelayUpperBound - a_target_now
v_target = min(v_target_lower, v_target_upper)
a_target = min(a_target_lower, a_target_upper)
v_target_1sec = interp(self.CP.longitudinalActuatorDelayUpperBound + t_since_plan + 1.0, ModelConstants.T_IDXS[:CONTROL_N], speeds)
else:
v_target = 0.0
v_target_now = 0.0
v_target_1sec = 0.0
a_target = 0.0
self.pid.neg_limit = accel_limits[0]
self.pid.pos_limit = accel_limits[1]
output_accel = self.last_output_accel
self.long_control_state = long_control_state_trans(self.CP, active, self.long_control_state, CS.vEgo,
v_target, v_target_1sec, CS.brakePressed,
CS.cruiseState.standstill)
if self.long_control_state == LongCtrlState.off:
self.reset(CS.vEgo)
output_accel = 0.
elif self.long_control_state == LongCtrlState.stopping:
if output_accel > self.CP.stopAccel:
output_accel = min(output_accel, 0.0)
output_accel -= self.CP.stoppingDecelRate * DT_CTRL
self.reset(CS.vEgo)
elif self.long_control_state == LongCtrlState.starting:
output_accel = self.CP.startAccel
self.reset(CS.vEgo)
elif self.long_control_state == LongCtrlState.pid:
self.v_pid = v_target_now
# Toyota starts braking more when it thinks you want to stop
# Freeze the integrator so we don't accelerate to compensate, and don't allow positive acceleration
# TODO too complex, needs to be simplified and tested on toyotas
prevent_overshoot = not self.CP.stoppingControl and CS.vEgo < 1.5 and v_target_1sec < 0.7 and v_target_1sec < self.v_pid
deadzone = interp(CS.vEgo, self.CP.longitudinalTuning.deadzoneBP, self.CP.longitudinalTuning.deadzoneV)
freeze_integrator = prevent_overshoot
error = self.v_pid - CS.vEgo
error_deadzone = apply_deadzone(error, deadzone)
output_accel = self.pid.update(error_deadzone, speed=CS.vEgo,
feedforward=a_target,
freeze_integrator=freeze_integrator)
self.last_output_accel = clip(output_accel, accel_limits[0], accel_limits[1])
if self.params_memory.get_bool("FrogPilotTogglesUpdated"):
self.update_frogpilot_params()
return self.last_output_accel
def update_frogpilot_params(self):
kiV = [self.params.get_float("kiV1"), self.params.get_float("kiV2"), self.params.get_float("kiV3"), self.params.get_float("kiV4")]
kpV = [self.params.get_float("kpV1"), self.params.get_float("kpV2"), self.params.get_float("kpV3"), self.params.get_float("kpV4")]
if self.params.get_bool("Tuning"):
self.pid = PIDController((self.CP.longitudinalTuning.kpBP, kpV),
(self.CP.longitudinalTuning.kiBP, kiV),
k_f=self.CP.longitudinalTuning.kf, rate=1 / DT_CTRL)
else:
self.pid = PIDController((self.CP.longitudinalTuning.kpBP, self.CP.longitudinalTuning.kpV),
(self.CP.longitudinalTuning.kiBP, self.CP.longitudinalTuning.kiV),
k_f=self.CP.longitudinalTuning.kf, rate=1 / DT_CTRL)

View File

@@ -0,0 +1,2 @@
acados_ocp_long.json
c_generated_code/

View File

@@ -0,0 +1,96 @@
Import('env', 'envCython', 'arch', 'messaging_python', 'common_python', 'opendbc_python')
gen = "c_generated_code"
casadi_model = [
f'{gen}/long_model/long_expl_ode_fun.c',
f'{gen}/long_model/long_expl_vde_forw.c',
]
casadi_cost_y = [
f'{gen}/long_cost/long_cost_y_fun.c',
f'{gen}/long_cost/long_cost_y_fun_jac_ut_xt.c',
f'{gen}/long_cost/long_cost_y_hess.c',
]
casadi_cost_e = [
f'{gen}/long_cost/long_cost_y_e_fun.c',
f'{gen}/long_cost/long_cost_y_e_fun_jac_ut_xt.c',
f'{gen}/long_cost/long_cost_y_e_hess.c',
]
casadi_cost_0 = [
f'{gen}/long_cost/long_cost_y_0_fun.c',
f'{gen}/long_cost/long_cost_y_0_fun_jac_ut_xt.c',
f'{gen}/long_cost/long_cost_y_0_hess.c',
]
casadi_constraints = [
f'{gen}/long_constraints/long_constr_h_fun.c',
f'{gen}/long_constraints/long_constr_h_fun_jac_uxt_zt.c',
]
build_files = [f'{gen}/acados_solver_long.c'] + casadi_model + casadi_cost_y + casadi_cost_e + \
casadi_cost_0 + casadi_constraints
# extra generated files used to trigger a rebuild
generated_files = [
f'{gen}/Makefile',
f'{gen}/main_long.c',
f'{gen}/main_sim_long.c',
f'{gen}/acados_solver_long.h',
f'{gen}/acados_sim_solver_long.h',
f'{gen}/acados_sim_solver_long.c',
f'{gen}/acados_solver.pxd',
f'{gen}/long_model/long_expl_vde_adj.c',
f'{gen}/long_model/long_model.h',
f'{gen}/long_constraints/long_constraints.h',
f'{gen}/long_cost/long_cost.h',
] + build_files
acados_dir = '#third_party/acados'
acados_templates_dir = '#third_party/acados/acados_template/c_templates_tera'
source_list = ['long_mpc.py',
'#selfdrive/modeld/constants.py',
f'{acados_dir}/include/acados_c/ocp_nlp_interface.h',
f'{acados_templates_dir}/acados_solver.in.c',
]
lenv = env.Clone()
lenv.Clean(generated_files, Dir(gen))
generated_long = lenv.Command(generated_files,
source_list,
f"cd {Dir('.').abspath} && python3 long_mpc.py")
lenv.Depends(generated_long, [messaging_python, common_python, opendbc_python])
lenv["CFLAGS"].append("-DACADOS_WITH_QPOASES")
lenv["CXXFLAGS"].append("-DACADOS_WITH_QPOASES")
lenv["CCFLAGS"].append("-Wno-unused")
if arch != "Darwin":
lenv["LINKFLAGS"].append("-Wl,--disable-new-dtags")
lib_solver = lenv.SharedLibrary(f"{gen}/acados_ocp_solver_long",
build_files,
LIBS=['m', 'acados', 'hpipm', 'blasfeo', 'qpOASES_e'])
# generate cython stuff
acados_ocp_solver_pyx = File("#third_party/acados/acados_template/acados_ocp_solver_pyx.pyx")
acados_ocp_solver_common = File("#third_party/acados/acados_template/acados_solver_common.pxd")
libacados_ocp_solver_pxd = File(f'{gen}/acados_solver.pxd')
libacados_ocp_solver_c = File(f'{gen}/acados_ocp_solver_pyx.c')
lenv2 = envCython.Clone()
lenv2["LINKFLAGS"] += [lib_solver[0].get_labspath()]
lenv2.Command(libacados_ocp_solver_c,
[acados_ocp_solver_pyx, acados_ocp_solver_common, libacados_ocp_solver_pxd],
f'cython' + \
f' -o {libacados_ocp_solver_c.get_labspath()}' + \
f' -I {libacados_ocp_solver_pxd.get_dir().get_labspath()}' + \
f' -I {acados_ocp_solver_common.get_dir().get_labspath()}' + \
f' {acados_ocp_solver_pyx.get_labspath()}')
lib_cython = lenv2.Program(f'{gen}/acados_ocp_solver_pyx.so', [libacados_ocp_solver_c])
lenv2.Depends(lib_cython, lib_solver)

View File

@@ -0,0 +1,496 @@
#!/usr/bin/env python3
import os
import time
import numpy as np
from cereal import log
from openpilot.common.conversions import Conversions as CV
from openpilot.common.numpy_fast import clip
from openpilot.common.params import Params
from openpilot.common.swaglog import cloudlog
# WARNING: imports outside of constants will not trigger a rebuild
from openpilot.selfdrive.modeld.constants import index_function
from openpilot.selfdrive.car.interfaces import ACCEL_MIN
if __name__ == '__main__': # generating code
from openpilot.third_party.acados.acados_template import AcadosModel, AcadosOcp, AcadosOcpSolver
else:
from openpilot.selfdrive.controls.lib.longitudinal_mpc_lib.c_generated_code.acados_ocp_solver_pyx import AcadosOcpSolverCython
from casadi import SX, vertcat
MODEL_NAME = 'long'
LONG_MPC_DIR = os.path.dirname(os.path.abspath(__file__))
EXPORT_DIR = os.path.join(LONG_MPC_DIR, "c_generated_code")
JSON_FILE = os.path.join(LONG_MPC_DIR, "acados_ocp_long.json")
SOURCES = ['lead0', 'lead1', 'cruise', 'e2e']
X_DIM = 3
U_DIM = 1
PARAM_DIM = 6
COST_E_DIM = 5
COST_DIM = COST_E_DIM + 1
CONSTR_DIM = 4
X_EGO_OBSTACLE_COST = 3.
X_EGO_COST = 0.
V_EGO_COST = 0.
A_EGO_COST = 0.
J_EGO_COST = 5.0
A_CHANGE_COST = 200.
DANGER_ZONE_COST = 100.
CRASH_DISTANCE = .25
LEAD_DANGER_FACTOR = 0.75
LIMIT_COST = 1e6
ACADOS_SOLVER_TYPE = 'SQP_RTI'
# Default lead acceleration decay set to 50% at 1s
LEAD_ACCEL_TAU = 1.5
# Fewer timestamps don't hurt performance and lead to
# much better convergence of the MPC with low iterations
N = 12
MAX_T = 10.0
T_IDXS_LST = [index_function(idx, max_val=MAX_T, max_idx=N) for idx in range(N+1)]
T_IDXS = np.array(T_IDXS_LST)
FCW_IDXS = T_IDXS < 5.0
T_DIFFS = np.diff(T_IDXS, prepend=[0.])
COMFORT_BRAKE = 2.5
STOP_DISTANCE = 6.0
def get_jerk_factor(custom_personalities=False, aggressive_jerk=0.5, standard_jerk=1.0, relaxed_jerk=1.0, personality=log.LongitudinalPersonality.standard):
if custom_personalities:
if personality==log.LongitudinalPersonality.relaxed:
return relaxed_jerk
elif personality==log.LongitudinalPersonality.standard:
return standard_jerk
elif personality==log.LongitudinalPersonality.aggressive:
return aggressive_jerk
else:
raise NotImplementedError("Longitudinal personality not supported")
else:
if personality==log.LongitudinalPersonality.relaxed:
return 1.0
elif personality==log.LongitudinalPersonality.standard:
return 1.0
elif personality==log.LongitudinalPersonality.aggressive:
return 0.5
else:
raise NotImplementedError("Longitudinal personality not supported")
def get_T_FOLLOW(custom_personalities=False, aggressive_follow=1.25, standard_follow=1.45, relaxed_follow=1.75, personality=log.LongitudinalPersonality.standard):
if custom_personalities:
if personality==log.LongitudinalPersonality.relaxed:
return relaxed_follow
elif personality==log.LongitudinalPersonality.standard:
return standard_follow
elif personality==log.LongitudinalPersonality.aggressive:
return aggressive_follow
else:
raise NotImplementedError("Longitudinal personality not supported")
else:
if personality==log.LongitudinalPersonality.relaxed:
return 1.75
elif personality==log.LongitudinalPersonality.standard:
return 1.45
elif personality==log.LongitudinalPersonality.aggressive:
return 1.25
else:
raise NotImplementedError("Longitudinal personality not supported")
def get_stopped_equivalence_factor(v_lead):
return (v_lead**2) / (2 * COMFORT_BRAKE)
def get_safe_obstacle_distance(v_ego, t_follow):
return (v_ego**2) / (2 * COMFORT_BRAKE) + t_follow * v_ego + STOP_DISTANCE
def desired_follow_distance(v_ego, v_lead, t_follow=None):
if t_follow is None:
t_follow = get_T_FOLLOW()
return get_safe_obstacle_distance(v_ego, t_follow) - get_stopped_equivalence_factor(v_lead)
def gen_long_model():
model = AcadosModel()
model.name = MODEL_NAME
# set up states & controls
x_ego = SX.sym('x_ego')
v_ego = SX.sym('v_ego')
a_ego = SX.sym('a_ego')
model.x = vertcat(x_ego, v_ego, a_ego)
# controls
j_ego = SX.sym('j_ego')
model.u = vertcat(j_ego)
# xdot
x_ego_dot = SX.sym('x_ego_dot')
v_ego_dot = SX.sym('v_ego_dot')
a_ego_dot = SX.sym('a_ego_dot')
model.xdot = vertcat(x_ego_dot, v_ego_dot, a_ego_dot)
# live parameters
a_min = SX.sym('a_min')
a_max = SX.sym('a_max')
x_obstacle = SX.sym('x_obstacle')
prev_a = SX.sym('prev_a')
lead_t_follow = SX.sym('lead_t_follow')
lead_danger_factor = SX.sym('lead_danger_factor')
model.p = vertcat(a_min, a_max, x_obstacle, prev_a, lead_t_follow, lead_danger_factor)
# dynamics model
f_expl = vertcat(v_ego, a_ego, j_ego)
model.f_impl_expr = model.xdot - f_expl
model.f_expl_expr = f_expl
return model
def gen_long_ocp():
ocp = AcadosOcp()
ocp.model = gen_long_model()
Tf = T_IDXS[-1]
# set dimensions
ocp.dims.N = N
# set cost module
ocp.cost.cost_type = 'NONLINEAR_LS'
ocp.cost.cost_type_e = 'NONLINEAR_LS'
QR = np.zeros((COST_DIM, COST_DIM))
Q = np.zeros((COST_E_DIM, COST_E_DIM))
ocp.cost.W = QR
ocp.cost.W_e = Q
x_ego, v_ego, a_ego = ocp.model.x[0], ocp.model.x[1], ocp.model.x[2]
j_ego = ocp.model.u[0]
a_min, a_max = ocp.model.p[0], ocp.model.p[1]
x_obstacle = ocp.model.p[2]
prev_a = ocp.model.p[3]
lead_t_follow = ocp.model.p[4]
lead_danger_factor = ocp.model.p[5]
ocp.cost.yref = np.zeros((COST_DIM, ))
ocp.cost.yref_e = np.zeros((COST_E_DIM, ))
desired_dist_comfort = get_safe_obstacle_distance(v_ego, lead_t_follow)
# The main cost in normal operation is how close you are to the "desired" distance
# from an obstacle at every timestep. This obstacle can be a lead car
# or other object. In e2e mode we can use x_position targets as a cost
# instead.
costs = [((x_obstacle - x_ego) - (desired_dist_comfort)) / (v_ego + 10.),
x_ego,
v_ego,
a_ego,
a_ego - prev_a,
j_ego]
ocp.model.cost_y_expr = vertcat(*costs)
ocp.model.cost_y_expr_e = vertcat(*costs[:-1])
# Constraints on speed, acceleration and desired distance to
# the obstacle, which is treated as a slack constraint so it
# behaves like an asymmetrical cost.
constraints = vertcat(v_ego,
(a_ego - a_min),
(a_max - a_ego),
((x_obstacle - x_ego) - lead_danger_factor * (desired_dist_comfort)) / (v_ego + 10.))
ocp.model.con_h_expr = constraints
x0 = np.zeros(X_DIM)
ocp.constraints.x0 = x0
ocp.parameter_values = np.array([-1.2, 1.2, 0.0, 0.0, get_T_FOLLOW(), LEAD_DANGER_FACTOR])
# We put all constraint cost weights to 0 and only set them at runtime
cost_weights = np.zeros(CONSTR_DIM)
ocp.cost.zl = cost_weights
ocp.cost.Zl = cost_weights
ocp.cost.Zu = cost_weights
ocp.cost.zu = cost_weights
ocp.constraints.lh = np.zeros(CONSTR_DIM)
ocp.constraints.uh = 1e4*np.ones(CONSTR_DIM)
ocp.constraints.idxsh = np.arange(CONSTR_DIM)
# The HPIPM solver can give decent solutions even when it is stopped early
# Which is critical for our purpose where compute time is strictly bounded
# We use HPIPM in the SPEED_ABS mode, which ensures fastest runtime. This
# does not cause issues since the problem is well bounded.
ocp.solver_options.qp_solver = 'PARTIAL_CONDENSING_HPIPM'
ocp.solver_options.hessian_approx = 'GAUSS_NEWTON'
ocp.solver_options.integrator_type = 'ERK'
ocp.solver_options.nlp_solver_type = ACADOS_SOLVER_TYPE
ocp.solver_options.qp_solver_cond_N = 1
# More iterations take too much time and less lead to inaccurate convergence in
# some situations. Ideally we would run just 1 iteration to ensure fixed runtime.
ocp.solver_options.qp_solver_iter_max = 10
ocp.solver_options.qp_tol = 1e-3
# set prediction horizon
ocp.solver_options.tf = Tf
ocp.solver_options.shooting_nodes = T_IDXS
ocp.code_export_directory = EXPORT_DIR
return ocp
class LongitudinalMpc:
def __init__(self, mode='acc'):
# FrogPilot variables
self.params_memory = Params("/dev/shm/params")
self.update_frogpilot_params()
self.mode = mode
self.solver = AcadosOcpSolverCython(MODEL_NAME, ACADOS_SOLVER_TYPE, N)
self.reset()
self.source = SOURCES[2]
def reset(self):
# self.solver = AcadosOcpSolverCython(MODEL_NAME, ACADOS_SOLVER_TYPE, N)
self.solver.reset()
# self.solver.options_set('print_level', 2)
self.v_solution = np.zeros(N+1)
self.a_solution = np.zeros(N+1)
self.prev_a = np.array(self.a_solution)
self.j_solution = np.zeros(N)
self.yref = np.zeros((N+1, COST_DIM))
for i in range(N):
self.solver.cost_set(i, "yref", self.yref[i])
self.solver.cost_set(N, "yref", self.yref[N][:COST_E_DIM])
self.x_sol = np.zeros((N+1, X_DIM))
self.u_sol = np.zeros((N,1))
self.params = np.zeros((N+1, PARAM_DIM))
for i in range(N+1):
self.solver.set(i, 'x', np.zeros(X_DIM))
self.last_cloudlog_t = 0
self.status = False
self.crash_cnt = 0.0
self.solution_status = 0
# timers
self.solve_time = 0.0
self.time_qp_solution = 0.0
self.time_linearization = 0.0
self.time_integrator = 0.0
self.x0 = np.zeros(X_DIM)
self.set_weights()
def set_cost_weights(self, cost_weights, constraint_cost_weights):
W = np.asfortranarray(np.diag(cost_weights))
for i in range(N):
# TODO don't hardcode A_CHANGE_COST idx
# reduce the cost on (a-a_prev) later in the horizon.
W[4,4] = cost_weights[4] * np.interp(T_IDXS[i], [0.0, 1.0, 2.0], [1.0, 1.0, 0.0])
self.solver.cost_set(i, 'W', W)
# Setting the slice without the copy make the array not contiguous,
# causing issues with the C interface.
self.solver.cost_set(N, 'W', np.copy(W[:COST_E_DIM, :COST_E_DIM]))
# Set L2 slack cost on lower bound constraints
Zl = np.array(constraint_cost_weights)
for i in range(N):
self.solver.cost_set(i, 'Zl', Zl)
def set_weights(self, jerk_factor=1.0, prev_accel_constraint=True, personality=log.LongitudinalPersonality.standard):
if self.mode == 'acc':
a_change_cost = A_CHANGE_COST if prev_accel_constraint else 0
cost_weights = [X_EGO_OBSTACLE_COST, X_EGO_COST, V_EGO_COST, A_EGO_COST, jerk_factor * a_change_cost, jerk_factor * J_EGO_COST]
constraint_cost_weights = [LIMIT_COST, LIMIT_COST, LIMIT_COST, DANGER_ZONE_COST]
elif self.mode == 'blended':
a_change_cost = 40.0 if prev_accel_constraint else 0
cost_weights = [0., 0.1, 0.2, 5.0, a_change_cost, 1.0]
constraint_cost_weights = [LIMIT_COST, LIMIT_COST, LIMIT_COST, 50.0]
else:
raise NotImplementedError(f'Planner mode {self.mode} not recognized in planner cost set')
self.set_cost_weights(cost_weights, constraint_cost_weights)
def set_cur_state(self, v, a):
v_prev = self.x0[1]
self.x0[1] = v
self.x0[2] = a
if abs(v_prev - v) > 2.: # probably only helps if v < v_prev
for i in range(N+1):
self.solver.set(i, 'x', self.x0)
@staticmethod
def extrapolate_lead(x_lead, v_lead, a_lead, a_lead_tau):
a_lead_traj = a_lead * np.exp(-a_lead_tau * (T_IDXS**2)/2.)
v_lead_traj = np.clip(v_lead + np.cumsum(T_DIFFS * a_lead_traj), 0.0, 1e8)
x_lead_traj = x_lead + np.cumsum(T_DIFFS * v_lead_traj)
lead_xv = np.column_stack((x_lead_traj, v_lead_traj))
return lead_xv
def process_lead(self, lead, increased_stopping_distance=0):
v_ego = self.x0[1]
increased_stopping_distance = max(increased_stopping_distance - v_ego, 0)
if lead is not None and lead.status:
x_lead = lead.dRel - increased_stopping_distance
v_lead = lead.vLead
a_lead = lead.aLeadK
a_lead_tau = lead.aLeadTau
else:
# Fake a fast lead car, so mpc can keep running in the same mode
x_lead = 50.0
v_lead = v_ego + 10.0
a_lead = 0.0
a_lead_tau = LEAD_ACCEL_TAU
# MPC will not converge if immediate crash is expected
# Clip lead distance to what is still possible to brake for
min_x_lead = ((v_ego + v_lead)/2) * (v_ego - v_lead) / (-ACCEL_MIN * 2)
x_lead = clip(x_lead, min_x_lead, 1e8)
v_lead = clip(v_lead, 0.0, 1e8)
a_lead = clip(a_lead, -10., 5.)
lead_xv = self.extrapolate_lead(x_lead, v_lead, a_lead, a_lead_tau)
return lead_xv
def set_accel_limits(self, min_a, max_a):
# TODO this sets a max accel limit, but the minimum limit is only for cruise decel
# needs refactor
self.cruise_min_a = min_a
self.max_a = max_a
def update(self, lead_one, lead_two, v_cruise, x, v, a, j, radarless_model, t_follow, trafficModeActive, personality=log.LongitudinalPersonality.standard):
v_ego = self.x0[1]
self.status = lead_one.status or lead_two.status
lead_xv_0 = self.process_lead(lead_one, self.increased_stopping_distance if not trafficModeActive else 0)
lead_xv_1 = self.process_lead(lead_two)
# To estimate a safe distance from a moving lead, we calculate how much stopping
# distance that lead needs as a minimum. We can add that to the current distance
# and then treat that as a stopped car/obstacle at this new distance.
lead_0_obstacle = lead_xv_0[:,0] + get_stopped_equivalence_factor(lead_xv_0[:,1])
lead_1_obstacle = lead_xv_1[:,0] + get_stopped_equivalence_factor(lead_xv_1[:,1])
self.params[:,0] = ACCEL_MIN
self.params[:,1] = self.max_a
# Update in ACC mode or ACC/e2e blend
if self.mode == 'acc':
self.params[:,5] = LEAD_DANGER_FACTOR
# Fake an obstacle for cruise, this ensures smooth acceleration to set speed
# when the leads are no factor.
v_lower = v_ego + (T_IDXS * self.cruise_min_a * 1.05)
v_upper = v_ego + (T_IDXS * self.max_a * 1.05)
v_cruise_clipped = np.clip(v_cruise * np.ones(N+1),
v_lower,
v_upper)
cruise_obstacle = np.cumsum(T_DIFFS * v_cruise_clipped) + get_safe_obstacle_distance(v_cruise_clipped, t_follow)
x_obstacles = np.column_stack([lead_0_obstacle, lead_1_obstacle, cruise_obstacle])
self.source = SOURCES[np.argmin(x_obstacles[0])]
# These are not used in ACC mode
x[:], v[:], a[:], j[:] = 0.0, 0.0, 0.0, 0.0
elif self.mode == 'blended':
self.params[:,5] = 1.0
x_obstacles = np.column_stack([lead_0_obstacle,
lead_1_obstacle])
cruise_target = T_IDXS * np.clip(v_cruise, v_ego - 2.0, 1e3) + x[0]
xforward = ((v[1:] + v[:-1]) / 2) * (T_IDXS[1:] - T_IDXS[:-1])
x = np.cumsum(np.insert(xforward, 0, x[0]))
x_and_cruise = np.column_stack([x, cruise_target])
x = np.min(x_and_cruise, axis=1)
self.source = 'e2e' if x_and_cruise[1,0] < x_and_cruise[1,1] else 'cruise'
else:
raise NotImplementedError(f'Planner mode {self.mode} not recognized in planner update')
self.yref[:,1] = x
self.yref[:,2] = v
self.yref[:,3] = a
self.yref[:,5] = j
for i in range(N):
self.solver.set(i, "yref", self.yref[i])
self.solver.set(N, "yref", self.yref[N][:COST_E_DIM])
self.params[:,2] = np.min(x_obstacles, axis=1)
self.params[:,3] = np.copy(self.prev_a)
self.params[:,4] = t_follow
self.run()
lead_probability = lead_one.prob if radarless_model else lead_one.modelProb
if (np.any(lead_xv_0[FCW_IDXS,0] - self.x_sol[FCW_IDXS,0] < CRASH_DISTANCE) and lead_probability > 0.9):
self.crash_cnt += 1
else:
self.crash_cnt = 0
# Check if it got within lead comfort range
# TODO This should be done cleaner
if self.mode == 'blended':
if any((lead_0_obstacle - get_safe_obstacle_distance(self.x_sol[:,1], t_follow))- self.x_sol[:,0] < 0.0):
self.source = 'lead0'
if any((lead_1_obstacle - get_safe_obstacle_distance(self.x_sol[:,1], t_follow))- self.x_sol[:,0] < 0.0) and \
(lead_1_obstacle[0] - lead_0_obstacle[0]):
self.source = 'lead1'
if self.params_memory.get_bool("FrogPilotTogglesUpdated"):
self.update_frogpilot_params()
def run(self):
# t0 = time.monotonic()
# reset = 0
for i in range(N+1):
self.solver.set(i, 'p', self.params[i])
self.solver.constraints_set(0, "lbx", self.x0)
self.solver.constraints_set(0, "ubx", self.x0)
self.solution_status = self.solver.solve()
self.solve_time = float(self.solver.get_stats('time_tot')[0])
self.time_qp_solution = float(self.solver.get_stats('time_qp')[0])
self.time_linearization = float(self.solver.get_stats('time_lin')[0])
self.time_integrator = float(self.solver.get_stats('time_sim')[0])
# qp_iter = self.solver.get_stats('statistics')[-1][-1] # SQP_RTI specific
# print(f"long_mpc timings: tot {self.solve_time:.2e}, qp {self.time_qp_solution:.2e}, lin {self.time_linearization:.2e}, \
# integrator {self.time_integrator:.2e}, qp_iter {qp_iter}")
# res = self.solver.get_residuals()
# print(f"long_mpc residuals: {res[0]:.2e}, {res[1]:.2e}, {res[2]:.2e}, {res[3]:.2e}")
# self.solver.print_statistics()
for i in range(N+1):
self.x_sol[i] = self.solver.get(i, 'x')
for i in range(N):
self.u_sol[i] = self.solver.get(i, 'u')
self.v_solution = self.x_sol[:,1]
self.a_solution = self.x_sol[:,2]
self.j_solution = self.u_sol[:,0]
self.prev_a = np.interp(T_IDXS + 0.05, T_IDXS, self.a_solution)
t = time.monotonic()
if self.solution_status != 0:
if t > self.last_cloudlog_t + 5.0:
self.last_cloudlog_t = t
cloudlog.warning(f"Long mpc reset, solution_status: {self.solution_status}")
self.reset()
# reset = 1
# print(f"long_mpc timings: total internal {self.solve_time:.2e}, external: {(time.monotonic() - t0):.2e} qp {self.time_qp_solution:.2e}, \
# lin {self.time_linearization:.2e} qp_iter {qp_iter}, reset {reset}")
def update_frogpilot_params(self):
params = Params()
is_metric = params.get_bool("IsMetric")
longitudinal_tune = params.get_bool("LongitudinalTune")
self.increased_stopping_distance = params.get_int("StoppingDistance") * (1 if is_metric else CV.FOOT_TO_METER) if longitudinal_tune else 0
if __name__ == "__main__":
ocp = gen_long_ocp()
AcadosOcpSolver.generate(ocp, json_file=JSON_FILE)
# AcadosOcpSolver.build(ocp.code_export_directory, with_cython=True)

View File

@@ -0,0 +1,282 @@
#!/usr/bin/env python3
import math
import numpy as np
from openpilot.common.numpy_fast import clip, interp
import cereal.messaging as messaging
from openpilot.common.conversions import Conversions as CV
from openpilot.common.filter_simple import FirstOrderFilter
from openpilot.common.params import Params
from openpilot.common.simple_kalman import KF1D
from openpilot.common.realtime import DT_MDL
from openpilot.common.swaglog import cloudlog
from openpilot.selfdrive.modeld.constants import ModelConstants
from openpilot.selfdrive.car.interfaces import ACCEL_MIN, ACCEL_MAX
from openpilot.selfdrive.controls.lib.longcontrol import LongCtrlState
from openpilot.selfdrive.controls.lib.longitudinal_mpc_lib.long_mpc import LongitudinalMpc
from openpilot.selfdrive.controls.lib.longitudinal_mpc_lib.long_mpc import T_IDXS as T_IDXS_MPC, LEAD_ACCEL_TAU
from openpilot.selfdrive.controls.lib.drive_helpers import V_CRUISE_MAX, CONTROL_N, get_speed_error
from openpilot.system.version import get_short_branch
from openpilot.selfdrive.frogpilot.controls.lib.model_manager import RADARLESS_MODELS
import sys
LON_MPC_STEP = 0.2 # first step is 0.2s
A_CRUISE_MIN = -1.2
A_CRUISE_MAX_VALS = [1.6, 1.2, 0.8, 0.6]
A_CRUISE_MAX_BP = [0., 10.0, 25., 40.]
# Lookup table for turns
_A_TOTAL_MAX_V = [1.7, 3.2]
_A_TOTAL_MAX_BP = [20., 40.]
# Kalman filter states enum
LEAD_KALMAN_SPEED, LEAD_KALMAN_ACCEL = 0, 1
def get_max_accel(v_ego):
return interp(v_ego, A_CRUISE_MAX_BP, A_CRUISE_MAX_VALS)
def limit_accel_in_turns(v_ego, angle_steers, a_target, CP):
"""
This function returns a limited long acceleration allowed, depending on the existing lateral acceleration
this should avoid accelerating when losing the target in turns
"""
# FIXME: This function to calculate lateral accel is incorrect and should use the VehicleModel
# The lookup table for turns should also be updated if we do this
a_total_max = interp(v_ego, _A_TOTAL_MAX_BP, _A_TOTAL_MAX_V)
a_y = v_ego ** 2 * angle_steers * CV.DEG_TO_RAD / (CP.steerRatio * CP.wheelbase)
a_x_allowed = math.sqrt(max(a_total_max ** 2 - a_y ** 2, 0.))
return [a_target[0], min(a_target[1], a_x_allowed)]
def lead_kf(v_lead: float, dt: float = 0.05):
# Lead Kalman Filter params, calculating K from A, C, Q, R requires the control library.
# hardcoding a lookup table to compute K for values of radar_ts between 0.01s and 0.2s
assert dt > .01 and dt < .2, "Radar time step must be between .01s and 0.2s"
A = [[1.0, dt], [0.0, 1.0]]
C = [1.0, 0.0]
#Q = np.matrix([[10., 0.0], [0.0, 100.]])
#R = 1e3
#K = np.matrix([[ 0.05705578], [ 0.03073241]])
dts = [dt * 0.01 for dt in range(1, 21)]
K0 = [0.12287673, 0.14556536, 0.16522756, 0.18281627, 0.1988689, 0.21372394,
0.22761098, 0.24069424, 0.253096, 0.26491023, 0.27621103, 0.28705801,
0.29750003, 0.30757767, 0.31732515, 0.32677158, 0.33594201, 0.34485814,
0.35353899, 0.36200124]
K1 = [0.29666309, 0.29330885, 0.29042818, 0.28787125, 0.28555364, 0.28342219,
0.28144091, 0.27958406, 0.27783249, 0.27617149, 0.27458948, 0.27307714,
0.27162685, 0.27023228, 0.26888809, 0.26758976, 0.26633338, 0.26511557,
0.26393339, 0.26278425]
K = [[interp(dt, dts, K0)], [interp(dt, dts, K1)]]
kf = KF1D([[v_lead], [0.0]], A, C, K)
return kf
class Lead:
def __init__(self):
self.dRel = 0.0
self.yRel = 0.0
self.vLead = 0.0
self.aLead = 0.0
self.vLeadK = 0.0
self.aLeadK = 0.0
self.aLeadTau = LEAD_ACCEL_TAU
self.prob = 0.0
self.status = False
self.kf: KF1D | None = None
def reset(self):
self.status = False
self.kf = None
self.aLeadTau = LEAD_ACCEL_TAU
def update(self, dRel: float, yRel: float, vLead: float, aLead: float, prob: float):
self.dRel = dRel
self.yRel = yRel
self.vLead = vLead
self.aLead = aLead
self.prob = prob
self.status = True
if self.kf is None:
self.kf = lead_kf(self.vLead)
else:
self.kf.update(self.vLead)
self.vLeadK = float(self.kf.x[LEAD_KALMAN_SPEED][0])
self.aLeadK = float(self.kf.x[LEAD_KALMAN_ACCEL][0])
# Learn if constant acceleration
if abs(self.aLeadK) < 0.5:
self.aLeadTau = LEAD_ACCEL_TAU
else:
self.aLeadTau *= 0.9
class LongitudinalPlanner:
def __init__(self, CP, init_v=0.0, init_a=0.0, dt=DT_MDL):
self.CP = CP
self.mpc = LongitudinalMpc()
self.fcw = False
self.dt = dt
self.a_desired = init_a
self.v_desired_filter = FirstOrderFilter(init_v, 2.0, self.dt)
self.v_model_error = 0.0
self.lead_one = Lead()
self.lead_two = Lead()
self.v_desired_trajectory = np.zeros(CONTROL_N)
self.a_desired_trajectory = np.zeros(CONTROL_N)
self.j_desired_trajectory = np.zeros(CONTROL_N)
self.solverExecutionTime = 0.0
# FrogPilot variables
self.params = Params()
self.params_memory = Params("/dev/shm/params")
self.radarless_model = self.params.get("Model", encoding='utf-8') in RADARLESS_MODELS
self.release = get_short_branch() == "clearpilot"
self.update_frogpilot_params()
@staticmethod
def parse_model(model_msg, model_error, v_ego, taco_tune):
if (len(model_msg.position.x) == 33 and
len(model_msg.velocity.x) == 33 and
len(model_msg.acceleration.x) == 33):
x = np.interp(T_IDXS_MPC, ModelConstants.T_IDXS, model_msg.position.x) - model_error * T_IDXS_MPC
v = np.interp(T_IDXS_MPC, ModelConstants.T_IDXS, model_msg.velocity.x) - model_error
a = np.interp(T_IDXS_MPC, ModelConstants.T_IDXS, model_msg.acceleration.x)
j = np.zeros(len(T_IDXS_MPC))
else:
x = np.zeros(len(T_IDXS_MPC))
v = np.zeros(len(T_IDXS_MPC))
a = np.zeros(len(T_IDXS_MPC))
j = np.zeros(len(T_IDXS_MPC))
if taco_tune:
max_lat_accel = interp(v_ego, [5, 10, 20], [1.5, 2.0, 3.0])
curvatures = np.interp(T_IDXS_MPC, ModelConstants.T_IDXS, model_msg.orientationRate.z) / np.clip(v, 0.3, 100.0)
max_v = np.sqrt(max_lat_accel / (np.abs(curvatures) + 1e-3)) - 2.0
v = np.minimum(max_v, v)
return x, v, a, j
def update(self, sm):
self.mpc.mode = 'blended' if sm['controlsState'].experimentalMode else 'acc'
v_ego = sm['carState'].vEgo
v_cruise_kph = min(sm['controlsState'].vCruise, V_CRUISE_MAX)
v_cruise = v_cruise_kph * CV.KPH_TO_MS
long_control_off = sm['controlsState'].longControlState == LongCtrlState.off
force_slow_decel = sm['controlsState'].forceDecel
# Reset current state when not engaged, or user is controlling the speed
# clearpilot - might need changing
reset_state = long_control_off if self.CP.openpilotLongitudinalControl else not sm['controlsState'].enabled
# No change cost when user is controlling the speed, or when standstill
prev_accel_constraint = not (reset_state or sm['carState'].standstill)
accel_limits = [sm['frogpilotPlan'].minAcceleration, sm['frogpilotPlan'].maxAcceleration]
if self.mpc.mode == 'acc':
accel_limits_turns = limit_accel_in_turns(v_ego, sm['carState'].steeringAngleDeg, accel_limits, self.CP)
else:
accel_limits_turns = [ACCEL_MIN, ACCEL_MAX]
if reset_state:
self.v_desired_filter.x = v_ego
# Clip aEgo to cruise limits to prevent large accelerations when becoming active
self.a_desired = clip(sm['carState'].aEgo, accel_limits[0], accel_limits[1])
# Prevent divergence, smooth in current v_ego
self.v_desired_filter.x = max(0.0, self.v_desired_filter.update(v_ego))
# Compute model v_ego error
self.v_model_error = get_speed_error(sm['modelV2'], v_ego)
if force_slow_decel:
v_cruise = 0.0
# clip limits, cannot init MPC outside of bounds
accel_limits_turns[0] = min(accel_limits_turns[0], self.a_desired + 0.05)
accel_limits_turns[1] = max(accel_limits_turns[1], self.a_desired - 0.05)
if self.radarless_model:
model_leads = list(sm['modelV2'].leadsV3)
# TODO lead state should be invalidated if its different point than the previous one
lead_states = [self.lead_one, self.lead_two]
for index in range(len(lead_states)):
if len(model_leads) > index:
model_lead = model_leads[index]
lead_states[index].update(model_lead.x[0], model_lead.y[0], model_lead.v[0], model_lead.a[0], model_lead.prob)
else:
lead_states[index].reset()
else:
self.lead_one = sm['radarState'].leadOne
self.lead_two = sm['radarState'].leadTwo
self.mpc.set_weights(sm['frogpilotPlan'].jerk, prev_accel_constraint, personality=sm['controlsState'].personality)
self.mpc.set_accel_limits(accel_limits_turns[0], accel_limits_turns[1])
self.mpc.set_cur_state(self.v_desired_filter.x, self.a_desired)
x, v, a, j = self.parse_model(sm['modelV2'], self.v_model_error, v_ego, self.taco_tune)
self.mpc.update(self.lead_one, self.lead_two, sm['frogpilotPlan'].vCruise, x, v, a, j, self.radarless_model, sm['frogpilotPlan'].tFollow,
sm['frogpilotCarControl'].trafficModeActive, personality=sm['controlsState'].personality)
self.v_desired_trajectory_full = np.interp(ModelConstants.T_IDXS, T_IDXS_MPC, self.mpc.v_solution)
self.a_desired_trajectory_full = np.interp(ModelConstants.T_IDXS, T_IDXS_MPC, self.mpc.a_solution)
self.v_desired_trajectory = self.v_desired_trajectory_full[:CONTROL_N]
self.a_desired_trajectory = self.a_desired_trajectory_full[:CONTROL_N]
self.j_desired_trajectory = np.interp(ModelConstants.T_IDXS[:CONTROL_N], T_IDXS_MPC[:-1], self.mpc.j_solution)
# TODO counter is only needed because radar is glitchy, remove once radar is gone
self.fcw = self.mpc.crash_cnt > 2 and not sm['carState'].standstill
if self.fcw:
cloudlog.info("FCW triggered")
# Interpolate 0.05 seconds and save as starting point for next iteration
a_prev = self.a_desired
self.a_desired = float(interp(self.dt, ModelConstants.T_IDXS[:CONTROL_N], self.a_desired_trajectory))
self.v_desired_filter.x = self.v_desired_filter.x + self.dt * (self.a_desired + a_prev) / 2.0
if self.params_memory.get_bool("FrogPilotTogglesUpdated"):
self.update_frogpilot_params()
def update_frogpilot_params(self):
lateral_tune = self.params.get_bool("LateralTune")
self.taco_tune = lateral_tune and self.params.get_bool("TacoTune") and not self.release
def publish(self, sm, pm):
plan_send = messaging.new_message('longitudinalPlan')
plan_send.valid = sm.all_checks(service_list=['carState', 'controlsState'])
longitudinalPlan = plan_send.longitudinalPlan
longitudinalPlan.modelMonoTime = sm.logMonoTime['modelV2']
longitudinalPlan.processingDelay = (plan_send.logMonoTime / 1e9) - sm.logMonoTime['modelV2']
longitudinalPlan.speeds = self.v_desired_trajectory.tolist()
# print("LONG PLAN SPEEDS", sys.stderr)
# print(longitudinalPlan.speeds, sys.stderr)
# LONG PLAN SPEEDS <_io.TextIOWrapper name='<stderr>' mode='w' encoding='utf-8'>
# [10.240411, 10.242371, 10.248251, 10.257222, 10.267875, 10.281571, 10.285583, 10.283464, 10.281022, 10.281429, 10.281885, 10.282397, 10.282978, 10.283609, 10.282832, 10.281243, 10.279544] <_io.TextIOWrapper name='<stderr>' mode='w' encoding='utf-8'>
longitudinalPlan.accels = self.a_desired_trajectory.tolist()
longitudinalPlan.jerks = self.j_desired_trajectory.tolist()
longitudinalPlan.hasLead = self.lead_one.status
longitudinalPlan.longitudinalPlanSource = self.mpc.source
longitudinalPlan.fcw = self.fcw
longitudinalPlan.solverExecutionTime = self.mpc.solve_time
pm.send('longitudinalPlan', plan_send)

75
selfdrive/controls/lib/pid.py Executable file
View File

@@ -0,0 +1,75 @@
import numpy as np
from numbers import Number
from openpilot.common.numpy_fast import clip, interp
class PIDController():
def __init__(self, k_p, k_i, k_f=0., k_d=0., pos_limit=1e308, neg_limit=-1e308, rate=100):
self._k_p = k_p
self._k_i = k_i
self._k_d = k_d
self.k_f = k_f # feedforward gain
if isinstance(self._k_p, Number):
self._k_p = [[0], [self._k_p]]
if isinstance(self._k_i, Number):
self._k_i = [[0], [self._k_i]]
if isinstance(self._k_d, Number):
self._k_d = [[0], [self._k_d]]
self.pos_limit = pos_limit
self.neg_limit = neg_limit
self.i_unwind_rate = 0.3 / rate
self.i_rate = 1.0 / rate
self.speed = 0.0
self.reset()
@property
def k_p(self):
return interp(self.speed, self._k_p[0], self._k_p[1])
@property
def k_i(self):
return interp(self.speed, self._k_i[0], self._k_i[1])
@property
def k_d(self):
return interp(self.speed, self._k_d[0], self._k_d[1])
@property
def error_integral(self):
return self.i/self.k_i
def reset(self):
self.p = 0.0
self.i = 0.0
self.d = 0.0
self.f = 0.0
self.control = 0
def update(self, error, error_rate=0.0, speed=0.0, override=False, feedforward=0., freeze_integrator=False):
self.speed = speed
self.p = float(error) * self.k_p
self.f = feedforward * self.k_f
self.d = error_rate * self.k_d
if override:
self.i -= self.i_unwind_rate * float(np.sign(self.i))
else:
i = self.i + error * self.k_i * self.i_rate
control = self.p + i + self.d + self.f
# Update when changing i will move the control away from the limits
# or when i will move towards the sign of the error
if ((error >= 0 and (control <= self.pos_limit or i < 0.0)) or
(error <= 0 and (control >= self.neg_limit or i > 0.0))) and \
not freeze_integrator:
self.i = i
control = self.p + self.i + self.d + self.f
self.control = clip(control, self.neg_limit, self.pos_limit)
return self.control

View File

View File

@@ -0,0 +1,45 @@
#!/usr/bin/env python3
import random
import unittest
from openpilot.selfdrive.controls.lib.events import Alert, EVENTS
from openpilot.selfdrive.controls.lib.alertmanager import AlertManager
class TestAlertManager(unittest.TestCase):
def test_duration(self):
"""
Enforce that an alert lasts for max(alert duration, duration the alert is added)
"""
for duration in range(1, 100):
alert = None
while not isinstance(alert, Alert):
event = random.choice([e for e in EVENTS.values() if len(e)])
alert = random.choice(list(event.values()))
alert.duration = duration
# check two cases:
# - alert is added to AM for <= the alert's duration
# - alert is added to AM for > alert's duration
for greater in (True, False):
if greater:
add_duration = duration + random.randint(1, 10)
else:
add_duration = random.randint(1, duration)
show_duration = max(duration, add_duration)
AM = AlertManager()
for frame in range(duration+10):
if frame < add_duration:
AM.add_many(frame, [alert, ])
current_alert = AM.process_alerts(frame, {})
shown = current_alert is not None
should_show = frame <= show_duration
self.assertEqual(shown, should_show, msg=f"{frame=} {add_duration=} {duration=}")
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,43 @@
#!/usr/bin/env python3
import unittest
from parameterized import parameterized
from cereal import car, log
from openpilot.selfdrive.car.car_helpers import interfaces
from openpilot.selfdrive.car.honda.values import CAR as HONDA
from openpilot.selfdrive.car.toyota.values import CAR as TOYOTA
from openpilot.selfdrive.car.nissan.values import CAR as NISSAN
from openpilot.selfdrive.controls.lib.latcontrol_pid import LatControlPID
from openpilot.selfdrive.controls.lib.latcontrol_torque import LatControlTorque
from openpilot.selfdrive.controls.lib.latcontrol_angle import LatControlAngle
from openpilot.selfdrive.controls.lib.vehicle_model import VehicleModel
from openpilot.common.mock.generators import generate_liveLocationKalman
class TestLatControl(unittest.TestCase):
@parameterized.expand([(HONDA.CIVIC, LatControlPID), (TOYOTA.RAV4, LatControlTorque), (NISSAN.LEAF, LatControlAngle)])
def test_saturation(self, car_name, controller):
CarInterface, CarController, CarState = interfaces[car_name]
CP = CarInterface.get_non_essential_params(car_name)
CI = CarInterface(CP, CarController, CarState)
VM = VehicleModel(CP)
controller = controller(CP, CI)
CS = car.CarState.new_message()
CS.vEgo = 30
CS.steeringPressed = False
params = log.LiveParametersData.new_message()
llk = generate_liveLocationKalman()
for _ in range(1000):
_, _, lac_log = controller.update(True, CS, VM, params, False, 1, llk)
self.assertTrue(lac_log.saturated)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,70 @@
#!/usr/bin/env python3
import math
import unittest
import numpy as np
from control import StateSpace
from openpilot.selfdrive.car.honda.interface import CarInterface
from openpilot.selfdrive.car.honda.values import CAR
from openpilot.selfdrive.controls.lib.vehicle_model import VehicleModel, dyn_ss_sol, create_dyn_state_matrices
class TestVehicleModel(unittest.TestCase):
def setUp(self):
CP = CarInterface.get_non_essential_params(CAR.CIVIC)
self.VM = VehicleModel(CP)
def test_round_trip_yaw_rate(self):
# TODO: fix VM to work at zero speed
for u in np.linspace(1, 30, num=10):
for roll in np.linspace(math.radians(-20), math.radians(20), num=11):
for sa in np.linspace(math.radians(-20), math.radians(20), num=11):
yr = self.VM.yaw_rate(sa, u, roll)
new_sa = self.VM.get_steer_from_yaw_rate(yr, u, roll)
self.assertAlmostEqual(sa, new_sa)
def test_dyn_ss_sol_against_yaw_rate(self):
"""Verify that the yaw_rate helper function matches the results
from the state space model."""
for roll in np.linspace(math.radians(-20), math.radians(20), num=11):
for u in np.linspace(1, 30, num=10):
for sa in np.linspace(math.radians(-20), math.radians(20), num=11):
# Compute yaw rate based on state space model
_, yr1 = dyn_ss_sol(sa, u, roll, self.VM)
# Compute yaw rate using direct computations
yr2 = self.VM.yaw_rate(sa, u, roll)
self.assertAlmostEqual(float(yr1[0]), yr2)
def test_syn_ss_sol_simulate(self):
"""Verifies that dyn_ss_sol matches a simulation"""
for roll in np.linspace(math.radians(-20), math.radians(20), num=11):
for u in np.linspace(1, 30, num=10):
A, B = create_dyn_state_matrices(u, self.VM)
# Convert to discrete time system
ss = StateSpace(A, B, np.eye(2), np.zeros((2, 2)))
ss = ss.sample(0.01)
for sa in np.linspace(math.radians(-20), math.radians(20), num=11):
inp = np.array([[sa], [roll]])
# Simulate for 1 second
x1 = np.zeros((2, 1))
for _ in range(100):
x1 = ss.A @ x1 + ss.B @ inp
# Compute steady state solution directly
x2 = dyn_ss_sol(sa, u, roll, self.VM)
np.testing.assert_almost_equal(x1, x2, decimal=3)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,230 @@
#!/usr/bin/env python3
"""
Dynamic bicycle model from "The Science of Vehicle Dynamics (2014), M. Guiggiani"
The state is x = [v, r]^T
with v lateral speed [m/s], and r rotational speed [rad/s]
The input u is the steering angle [rad], and roll [rad]
The system is defined by
x_dot = A*x + B*u
A depends on longitudinal speed, u [m/s], and vehicle parameters CP
"""
import numpy as np
from numpy.linalg import solve
from cereal import car
ACCELERATION_DUE_TO_GRAVITY = 9.8
class VehicleModel:
def __init__(self, CP: car.CarParams):
"""
Args:
CP: Car Parameters
"""
# for math readability, convert long names car params into short names
self.m: float = CP.mass
self.j: float = CP.rotationalInertia
self.l: float = CP.wheelbase
self.aF: float = CP.centerToFront
self.aR: float = CP.wheelbase - CP.centerToFront
self.chi: float = CP.steerRatioRear
self.cF_orig: float = CP.tireStiffnessFront
self.cR_orig: float = CP.tireStiffnessRear
self.update_params(1.0, CP.steerRatio)
def update_params(self, stiffness_factor: float, steer_ratio: float) -> None:
"""Update the vehicle model with a new stiffness factor and steer ratio"""
self.cF: float = stiffness_factor * self.cF_orig
self.cR: float = stiffness_factor * self.cR_orig
self.sR: float = steer_ratio
def steady_state_sol(self, sa: float, u: float, roll: float) -> np.ndarray:
"""Returns the steady state solution.
If the speed is too low we can't use the dynamic model (tire slip is undefined),
we then have to use the kinematic model
Args:
sa: Steering wheel angle [rad]
u: Speed [m/s]
roll: Road Roll [rad]
Returns:
2x1 matrix with steady state solution (lateral speed, rotational speed)
"""
if u > 0.1:
return dyn_ss_sol(sa, u, roll, self)
else:
return kin_ss_sol(sa, u, self)
def calc_curvature(self, sa: float, u: float, roll: float) -> float:
"""Returns the curvature. Multiplied by the speed this will give the yaw rate.
Args:
sa: Steering wheel angle [rad]
u: Speed [m/s]
roll: Road Roll [rad]
Returns:
Curvature factor [1/m]
"""
return (self.curvature_factor(u) * sa / self.sR) + self.roll_compensation(roll, u)
def curvature_factor(self, u: float) -> float:
"""Returns the curvature factor.
Multiplied by wheel angle (not steering wheel angle) this will give the curvature.
Args:
u: Speed [m/s]
Returns:
Curvature factor [1/m]
"""
sf = calc_slip_factor(self)
return (1. - self.chi) / (1. - sf * u**2) / self.l
def get_steer_from_curvature(self, curv: float, u: float, roll: float) -> float:
"""Calculates the required steering wheel angle for a given curvature
Args:
curv: Desired curvature [1/m]
u: Speed [m/s]
roll: Road Roll [rad]
Returns:
Steering wheel angle [rad]
"""
return (curv - self.roll_compensation(roll, u)) * self.sR * 1.0 / self.curvature_factor(u)
def roll_compensation(self, roll: float, u: float) -> float:
"""Calculates the roll-compensation to curvature
Args:
roll: Road Roll [rad]
u: Speed [m/s]
Returns:
Roll compensation curvature [rad]
"""
sf = calc_slip_factor(self)
if abs(sf) < 1e-6:
return 0
else:
return (ACCELERATION_DUE_TO_GRAVITY * roll) / ((1 / sf) - u**2)
def get_steer_from_yaw_rate(self, yaw_rate: float, u: float, roll: float) -> float:
"""Calculates the required steering wheel angle for a given yaw_rate
Args:
yaw_rate: Desired yaw rate [rad/s]
u: Speed [m/s]
roll: Road Roll [rad]
Returns:
Steering wheel angle [rad]
"""
curv = yaw_rate / u
return self.get_steer_from_curvature(curv, u, roll)
def yaw_rate(self, sa: float, u: float, roll: float) -> float:
"""Calculate yaw rate
Args:
sa: Steering wheel angle [rad]
u: Speed [m/s]
roll: Road Roll [rad]
Returns:
Yaw rate [rad/s]
"""
return self.calc_curvature(sa, u, roll) * u
def kin_ss_sol(sa: float, u: float, VM: VehicleModel) -> np.ndarray:
"""Calculate the steady state solution at low speeds
At low speeds the tire slip is undefined, so a kinematic
model is used.
Args:
sa: Steering angle [rad]
u: Speed [m/s]
VM: Vehicle model
Returns:
2x1 matrix with steady state solution
"""
K = np.zeros((2, 1))
K[0, 0] = VM.aR / VM.sR / VM.l * u
K[1, 0] = 1. / VM.sR / VM.l * u
return K * sa
def create_dyn_state_matrices(u: float, VM: VehicleModel) -> tuple[np.ndarray, np.ndarray]:
"""Returns the A and B matrix for the dynamics system
Args:
u: Vehicle speed [m/s]
VM: Vehicle model
Returns:
A tuple with the 2x2 A matrix, and 2x2 B matrix
Parameters in the vehicle model:
cF: Tire stiffness Front [N/rad]
cR: Tire stiffness Front [N/rad]
aF: Distance from CG to front wheels [m]
aR: Distance from CG to rear wheels [m]
m: Mass [kg]
j: Rotational inertia [kg m^2]
sR: Steering ratio [-]
chi: Steer ratio rear [-]
"""
A = np.zeros((2, 2))
B = np.zeros((2, 2))
A[0, 0] = - (VM.cF + VM.cR) / (VM.m * u)
A[0, 1] = - (VM.cF * VM.aF - VM.cR * VM.aR) / (VM.m * u) - u
A[1, 0] = - (VM.cF * VM.aF - VM.cR * VM.aR) / (VM.j * u)
A[1, 1] = - (VM.cF * VM.aF**2 + VM.cR * VM.aR**2) / (VM.j * u)
# Steering input
B[0, 0] = (VM.cF + VM.chi * VM.cR) / VM.m / VM.sR
B[1, 0] = (VM.cF * VM.aF - VM.chi * VM.cR * VM.aR) / VM.j / VM.sR
# Roll input
B[0, 1] = -ACCELERATION_DUE_TO_GRAVITY
return A, B
def dyn_ss_sol(sa: float, u: float, roll: float, VM: VehicleModel) -> np.ndarray:
"""Calculate the steady state solution when x_dot = 0,
Ax + Bu = 0 => x = -A^{-1} B u
Args:
sa: Steering angle [rad]
u: Speed [m/s]
roll: Road Roll [rad]
VM: Vehicle model
Returns:
2x1 matrix with steady state solution
"""
A, B = create_dyn_state_matrices(u, VM)
inp = np.array([[sa], [roll]])
return -solve(A, B) @ inp # type: ignore
def calc_slip_factor(VM: VehicleModel) -> float:
"""The slip factor is a measure of how the curvature changes with speed
it's positive for Oversteering vehicle, negative (usual case) otherwise.
"""
return VM.m * (VM.cF * VM.aF - VM.cR * VM.aR) / (VM.l**2 * VM.cF * VM.cR)