Files
clearpilot/selfdrive/manager/process.py
Brian Hanson 4f16a8953a GPS fix + log directory redesign + dashcamd binary
GPS: fix AT response parsing (strip mmcli `response: '...'` wrapper),
fix capnp field names (horizontalAccuracy, hasFix), set system clock
directly from first GPS fix when time is invalid, kill system gpsd on
startup.

Logging: replace module-level log dir creation with init_log_dir()
called from manager_init(). Active session always at /data/log2/current
(real dir until time resolves, then symlink to timestamped dir). Add
LogDirInitialized param. Redirect both stdout+stderr for all processes.

Also: thorough process cleanup in launch scripts, dashcamd binary,
CLAUDE.md updates for GPS/telemetry/bench/logging docs.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-13 04:51:33 +00:00

433 lines
13 KiB
Python
Executable File

import importlib
import os
import signal
import struct
import sys
import datetime
import time
import subprocess
from collections.abc import Callable, ValuesView
from abc import ABC, abstractmethod
from multiprocessing import Process
from setproctitle import setproctitle
from cereal import car, log
import cereal.messaging as messaging
import openpilot.selfdrive.sentry as sentry
from openpilot.common.basedir import BASEDIR
from openpilot.common.params import Params
from openpilot.common.swaglog import cloudlog
from openpilot.common.time import system_time_valid
WATCHDOG_FN = "/dev/shm/wd_"
ENABLE_WATCHDOG = os.getenv("NO_WATCHDOG") is None
# CLEARPILOT: logging directory and session log
# init_log_dir() must be called once from manager_init() before any process starts.
# Until then, _log_dir and session_log are usable but write to a NullHandler.
import logging
_log_dir = "/data/log2/current"
_time_resolved = False
_session_handler = None
session_log = logging.getLogger("clearpilot.session")
session_log.setLevel(logging.DEBUG)
session_log.addHandler(logging.NullHandler())
def init_log_dir():
"""Create /data/log2/current as a real directory for this session.
Called once from manager_init(). Previous current (if a real dir) is
renamed to a timestamp or boot-monotonic name before we create a fresh one."""
global _log_dir, _time_resolved, _session_handler
log_base = "/data/log2"
current = os.path.join(log_base, "current")
os.makedirs(log_base, exist_ok=True)
# If 'current' is a symlink, just remove the symlink
if os.path.islink(current):
os.unlink(current)
# If 'current' is a real directory (leftover from previous session that
# never got time-resolved), rename it out of the way
elif os.path.isdir(current):
# Use mtime of session.log (or the dir itself) for the rename
session_file = os.path.join(current, "session.log")
mtime = os.path.getmtime(session_file) if os.path.exists(session_file) else os.path.getmtime(current)
ts = datetime.datetime.fromtimestamp(mtime).strftime('%Y-%m-%d-%H-%M-%S')
dest = os.path.join(log_base, ts)
# Avoid collision
if os.path.exists(dest):
dest = dest + f"-{int(time.monotonic())}"
try:
os.rename(current, dest)
except OSError:
pass
os.makedirs(current, exist_ok=True)
_log_dir = current
_time_resolved = False
# Set up session log file handler
_session_handler = logging.FileHandler(os.path.join(_log_dir, "session.log"))
_session_handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(message)s"))
# Remove NullHandler and add file handler
session_log.handlers.clear()
session_log.addHandler(_session_handler)
session_log.info("session started, log dir: %s", _log_dir)
def update_log_dir_timestamp():
"""Rename /data/log2/current to a real timestamp and replace with a symlink
once system time is valid."""
global _log_dir, _time_resolved, _session_handler
if _time_resolved:
return
if not system_time_valid():
return
log_base = "/data/log2"
current = os.path.join(log_base, "current")
ts_name = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
new_dir = os.path.join(log_base, ts_name)
try:
os.rename(current, new_dir)
# Create symlink: current -> YYYY-MM-DD-HH-MM-SS
os.symlink(ts_name, current)
_log_dir = new_dir
_time_resolved = True
# Re-point session log handler (open files follow the inode, but
# new opens should go through the symlink — update handler for clarity)
session_log.removeHandler(_session_handler)
_session_handler.close()
_session_handler = logging.FileHandler(os.path.join(_log_dir, "session.log"))
_session_handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(message)s"))
session_log.addHandler(_session_handler)
session_log.info("log directory renamed to %s", _log_dir)
# Signal via param that the directory has been time-resolved
try:
from openpilot.common.params import Params
Params().put("LogDirInitialized", "1")
except Exception:
pass
except OSError:
pass
def launcher(proc: str, name: str, log_path: str) -> None:
# CLEARPILOT: redirect stderr to per-process log file
try:
log_file = open(log_path, 'a')
os.dup2(log_file.fileno(), sys.stderr.fileno())
os.dup2(log_file.fileno(), sys.stdout.fileno())
except Exception as e:
print(f"CLEARPILOT: stderr redirect failed for {name}: {e}", file=sys.stderr)
try:
# import the process
mod = importlib.import_module(proc)
# rename the process
setproctitle(proc)
# create new context since we forked
messaging.context = messaging.Context()
# add daemon name tag to logs
cloudlog.bind(daemon=name)
sentry.set_tag("daemon", name)
# exec the process
mod.main()
except KeyboardInterrupt:
cloudlog.warning(f"child {proc} got SIGINT")
except Exception:
# can't install the crash handler because sys.excepthook doesn't play nice
# with threads, so catch it here.
sentry.capture_exception()
raise
def nativelauncher(pargs: list[str], cwd: str, name: str, log_path: str) -> None:
os.environ['MANAGER_DAEMON'] = name
# CLEARPILOT: redirect stderr and stdout to per-process log file
try:
log_file = open(log_path, 'a')
os.dup2(log_file.fileno(), sys.stderr.fileno())
os.dup2(log_file.fileno(), sys.stdout.fileno())
except Exception as e:
print(f"CLEARPILOT: stderr redirect failed for {name}: {e}", file=sys.stderr)
# exec the process
os.chdir(cwd)
os.execvp(pargs[0], pargs)
def join_process(process: Process, timeout: float) -> None:
# Process().join(timeout) will hang due to a python 3 bug: https://bugs.python.org/issue28382
# We have to poll the exitcode instead
t = time.monotonic()
while time.monotonic() - t < timeout and process.exitcode is None:
time.sleep(0.001)
class ManagerProcess(ABC):
daemon = False
sigkill = False
should_run: Callable[[bool, Params, car.CarParams], bool]
proc: Process | None = None
enabled = True
name = ""
last_watchdog_time = 0
watchdog_max_dt: int | None = None
watchdog_seen = False
shutting_down = False
@abstractmethod
def prepare(self) -> None:
pass
@abstractmethod
def start(self) -> None:
pass
def restart(self) -> None:
self.stop(sig=signal.SIGKILL)
self.start()
def check_watchdog(self, started: bool) -> None:
if self.watchdog_max_dt is None or self.proc is None:
return
try:
fn = WATCHDOG_FN + str(self.proc.pid)
with open(fn, "rb") as f:
# TODO: why can't pylint find struct.unpack?
self.last_watchdog_time = struct.unpack('Q', f.read())[0]
except Exception:
pass
dt = time.monotonic() - self.last_watchdog_time / 1e9
if dt > self.watchdog_max_dt:
if (self.watchdog_seen or self.always_watchdog and self.proc.exitcode is not None) and ENABLE_WATCHDOG:
cloudlog.error(f"Watchdog timeout for {self.name} (exitcode {self.proc.exitcode}) restarting ({started=})")
session_log.warning("watchdog timeout for %s (exitcode %s), restarting", self.name, self.proc.exitcode)
self.restart()
else:
self.watchdog_seen = True
def stop(self, retry: bool = True, block: bool = True, sig: signal.Signals = None) -> int | None:
if self.proc is None:
return None
if self.proc.exitcode is None:
if not self.shutting_down:
cloudlog.info(f"killing {self.name}")
if sig is None:
sig = signal.SIGKILL if self.sigkill else signal.SIGINT
self.signal(sig)
self.shutting_down = True
if not block:
return None
join_process(self.proc, 5)
# If process failed to die send SIGKILL
if self.proc.exitcode is None and retry:
cloudlog.info(f"killing {self.name} with SIGKILL")
self.signal(signal.SIGKILL)
self.proc.join()
ret = self.proc.exitcode
cloudlog.info(f"{self.name} is dead with {ret}")
if ret is not None and ret != 0:
session_log.error("process %s died with exit code %s", self.name, ret)
elif ret == 0:
session_log.info("process %s stopped (exit 0)", self.name)
if self.proc.exitcode is not None:
self.shutting_down = False
self.proc = None
return ret
def signal(self, sig: int) -> None:
if self.proc is None:
return
# Don't signal if already exited
if self.proc.exitcode is not None and self.proc.pid is not None:
return
# Can't signal if we don't have a pid
if self.proc.pid is None:
return
cloudlog.info(f"sending signal {sig} to {self.name}")
os.kill(self.proc.pid, sig)
def get_process_state_msg(self):
state = log.ManagerState.ProcessState.new_message()
state.name = self.name
if self.proc:
state.running = self.proc.is_alive()
state.shouldBeRunning = self.proc is not None and not self.shutting_down
state.pid = self.proc.pid or 0
state.exitCode = self.proc.exitcode or 0
return state
class NativeProcess(ManagerProcess):
def __init__(self, name, cwd, cmdline, should_run, enabled=True, sigkill=False, watchdog_max_dt=None, always_watchdog=False):
self.name = name
self.cwd = cwd
self.cmdline = cmdline
self.should_run = should_run
self.enabled = enabled
self.sigkill = sigkill
self.watchdog_max_dt = watchdog_max_dt
self.launcher = nativelauncher
self.always_watchdog = always_watchdog
def prepare(self) -> None:
pass
def start(self) -> None:
# In case we only tried a non blocking stop we need to stop it before restarting
if self.shutting_down:
self.stop()
if self.proc is not None:
return
global _log_dir
log_path = _log_dir+"/"+self.name+".log"
# CLEARPILOT: ensure log file exists even if child redirect fails
open(log_path, 'a').close()
cwd = os.path.join(BASEDIR, self.cwd)
cloudlog.info(f"starting process {self.name}")
session_log.info("starting %s", self.name)
self.proc = Process(name=self.name, target=self.launcher, args=(self.cmdline, cwd, self.name, log_path))
self.proc.start()
self.watchdog_seen = False
self.shutting_down = False
class PythonProcess(ManagerProcess):
def __init__(self, name, module, should_run, enabled=True, sigkill=False, watchdog_max_dt=None):
self.name = name
self.module = module
self.should_run = should_run
self.enabled = enabled
self.sigkill = sigkill
self.watchdog_max_dt = watchdog_max_dt
self.launcher = launcher
def prepare(self) -> None:
if self.enabled:
cloudlog.info(f"preimporting {self.module}")
importlib.import_module(self.module)
def start(self) -> None:
# In case we only tried a non blocking stop we need to stop it before restarting
if self.shutting_down:
self.stop()
if self.proc is not None:
return
global _log_dir
log_path = _log_dir+"/"+self.name+".log"
# CLEARPILOT: ensure log file exists even if child redirect fails
open(log_path, 'a').close()
cloudlog.info(f"starting python {self.module}")
session_log.info("starting %s", self.name)
self.proc = Process(name=self.name, target=self.launcher, args=(self.module, self.name, log_path))
self.proc.start()
self.watchdog_seen = False
self.shutting_down = False
class DaemonProcess(ManagerProcess):
"""Python process that has to stay running across manager restart.
This is used for athena so you don't lose SSH access when restarting manager."""
def __init__(self, name, module, param_name, enabled=True):
self.name = name
self.module = module
self.param_name = param_name
self.enabled = enabled
self.params = None
@staticmethod
def should_run(started, params, CP):
return True
def prepare(self) -> None:
pass
def start(self) -> None:
if self.params is None:
self.params = Params()
global _log_dir
log_path = _log_dir+"/"+self.name+".log"
pid = self.params.get(self.param_name, encoding='utf-8')
if pid is not None:
try:
os.kill(int(pid), 0)
with open(f'/proc/{pid}/cmdline') as f:
if self.module in f.read():
# daemon is running
return
except (OSError, FileNotFoundError):
# process is dead
pass
cloudlog.info(f"starting daemon {self.name}")
session_log.info("starting daemon %s", self.name)
proc = subprocess.Popen(['python', '-m', self.module],
stdin=open('/dev/null'),
stdout=open(log_path, 'a'),
stderr=subprocess.STDOUT,
preexec_fn=os.setpgrp)
self.params.put(self.param_name, str(proc.pid))
def stop(self, retry=True, block=True, sig=None) -> None:
pass
def ensure_running(procs: ValuesView[ManagerProcess], started: bool, params=None, CP: car.CarParams=None,
not_run: list[str] | None=None) -> list[ManagerProcess]:
if not_run is None:
not_run = []
running = []
for p in procs:
if p.enabled and p.name not in not_run and p.should_run(started, params, CP):
running.append(p)
else:
p.stop(block=False)
p.check_watchdog(started)
for p in running:
p.start()
return running