3d9143c41b
- Telemetry status bar in onroad UI: temp, fan %, model FPS, standstill
- Fix paramsMemory usage: Params("/dev/shm/params") not "/dev/shm/params/d"
- Telemetry/VPN toggles use ToggleControl with manual paramsMemory writes
- TelemetryEnabled/VpnEnabled registered PERSISTENT, written to memory path
- GPS telemetry: telemetryd subscribes to gpsLocation at 1Hz via cereal
- Nightrider: force CameraWidget bg black to eliminate color bleed border
- Suppress "Always On Lateral active" status bar message
- Re-enable gpsd and dashcamd
- CLAUDE.md: document memory params pattern, speed_limit.calculated usage
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
148 lines
4.1 KiB
Python
148 lines
4.1 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
ClearPilot telemetry collector.
|
|
|
|
Receives telemetry packets from any process via ZMQ, diffs against previous
|
|
state per group, and writes only changed values to CSV.
|
|
|
|
Controlled by TelemetryEnabled param — when toggled on, the first packet
|
|
from each group writes all values (full dump). When toggled off, stops writing.
|
|
|
|
CSV format: timestamp,group,key,value
|
|
"""
|
|
import csv
|
|
import json
|
|
import os
|
|
import shutil
|
|
import time
|
|
import zmq
|
|
|
|
import cereal.messaging as messaging
|
|
from openpilot.common.params import Params
|
|
from openpilot.selfdrive.clearpilot.telemetry import TELEMETRY_SOCK
|
|
from openpilot.selfdrive.manager.process import _log_dir, session_log
|
|
|
|
MIN_DISK_FREE_GB = 5
|
|
DISK_CHECK_INTERVAL = 10 # seconds
|
|
|
|
|
|
def main():
|
|
params = Params("/dev/shm/params")
|
|
ctx = zmq.Context.instance()
|
|
sock = ctx.socket(zmq.PULL)
|
|
sock.setsockopt(zmq.RCVHWM, 1000)
|
|
sock.bind(TELEMETRY_SOCK)
|
|
|
|
# GPS subscriber for location telemetry
|
|
sm = messaging.SubMaster(['gpsLocation'])
|
|
last_gps_log = 0
|
|
|
|
csv_path = os.path.join(_log_dir, "telemetry.csv")
|
|
state: dict[str, dict] = {} # group -> {key: last_value}
|
|
was_enabled = False
|
|
writer = None
|
|
f = None
|
|
last_disk_check = 0
|
|
|
|
while True:
|
|
# Check enable state every iteration (cheap param read)
|
|
enabled = params.get("TelemetryEnabled") == b"1"
|
|
|
|
# Check disk space every 10 seconds while enabled
|
|
if enabled and (time.monotonic() - last_disk_check) > DISK_CHECK_INTERVAL:
|
|
last_disk_check = time.monotonic()
|
|
disk = shutil.disk_usage("/data")
|
|
free_gb = disk.free / (1024 ** 3)
|
|
if free_gb < MIN_DISK_FREE_GB:
|
|
session_log.warning("telemetry disabled: disk free %.1f GB < %d GB", free_gb, MIN_DISK_FREE_GB)
|
|
params.put("TelemetryEnabled", "0")
|
|
enabled = False
|
|
|
|
# Toggled on: open CSV, clear state so first packet is a full dump
|
|
if enabled and not was_enabled:
|
|
f = open(csv_path, "a", newline="")
|
|
writer = csv.writer(f)
|
|
if os.path.getsize(csv_path) == 0:
|
|
writer.writerow(["timestamp", "group", "key", "value"])
|
|
state.clear() # force full dump on first packet per group
|
|
f.flush()
|
|
|
|
# Toggled off: close file
|
|
if not enabled and was_enabled:
|
|
if f:
|
|
f.close()
|
|
f = None
|
|
writer = None
|
|
|
|
was_enabled = enabled
|
|
|
|
# GPS telemetry at most 1Hz — fed through the same diff logic
|
|
if enabled and writer is not None:
|
|
sm.update(0)
|
|
now = time.monotonic()
|
|
if sm.updated['gpsLocation'] and (now - last_gps_log) >= 1.0:
|
|
gps = sm['gpsLocation']
|
|
gps_data = {
|
|
"latitude": f"{gps.latitude:.7f}",
|
|
"longitude": f"{gps.longitude:.7f}",
|
|
"speed": f"{gps.speed:.2f}",
|
|
"altitude": f"{gps.altitude:.1f}",
|
|
"bearing": f"{gps.bearingDeg:.1f}",
|
|
"accuracy": f"{gps.accuracy:.1f}",
|
|
}
|
|
ts = time.time()
|
|
if "gps" not in state:
|
|
state["gps"] = {}
|
|
prev_gps = state["gps"]
|
|
gps_changed = False
|
|
for key, value in gps_data.items():
|
|
if key not in prev_gps or prev_gps[key] != value:
|
|
writer.writerow([f"{ts:.6f}", "gps", key, value])
|
|
prev_gps[key] = value
|
|
gps_changed = True
|
|
if gps_changed:
|
|
f.flush()
|
|
last_gps_log = now
|
|
|
|
# Always drain the socket (even when disabled) to avoid backpressure
|
|
try:
|
|
raw = sock.recv_string(zmq.NOBLOCK)
|
|
except zmq.Again:
|
|
time.sleep(0.01)
|
|
continue
|
|
except zmq.ZMQError:
|
|
time.sleep(0.01)
|
|
continue
|
|
|
|
if not enabled or writer is None:
|
|
continue
|
|
|
|
try:
|
|
pkt = json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
ts = pkt.get("ts", 0)
|
|
group = pkt.get("group", "")
|
|
data = pkt.get("data", {})
|
|
|
|
if group not in state:
|
|
state[group] = {}
|
|
|
|
prev = state[group]
|
|
changed = False
|
|
|
|
for key, value in data.items():
|
|
str_val = str(value)
|
|
if key not in prev or prev[key] != str_val:
|
|
writer.writerow([f"{ts:.6f}", group, key, str_val])
|
|
prev[key] = str_val
|
|
changed = True
|
|
|
|
if changed:
|
|
f.flush()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|