telemetry: diff-based CSV logger with ZMQ IPC

- telemetry.py: client library, tlog(group, data) sends JSON over ZMQ PUSH
- telemetryd.py: collector process, diffs against previous state per group,
  writes only changed values to /data/log2/{session}/telemetry.csv
- Registered as always-run managed process in process_config.py

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-12 23:41:16 +00:00
parent 36221c4b10
commit e3bdae8b5e
3 changed files with 102 additions and 0 deletions

View File

@@ -0,0 +1,34 @@
"""
ClearPilot telemetry client library.
Usage from any process:
from openpilot.selfdrive.clearpilot.telemetry import tlog
tlog("canbus", {"speed": 45.2, "speed_limit": 60})
Sends JSON packets over ZMQ PUSH to telemetryd, which diffs and writes CSV.
"""
import json
import time
import zmq
TELEMETRY_SOCK = "ipc:///tmp/clearpilot_telemetry"
_ctx = None
_sock = None
def tlog(group: str, data: dict):
"""Log a telemetry packet. Only changed values will be written to CSV by telemetryd."""
global _ctx, _sock
if _sock is None:
_ctx = zmq.Context.instance()
_sock = _ctx.socket(zmq.PUSH)
_sock.setsockopt(zmq.LINGER, 0)
_sock.setsockopt(zmq.SNDHWM, 100)
_sock.connect(TELEMETRY_SOCK)
msg = json.dumps({"ts": time.time(), "group": group, "data": data})
try:
_sock.send_string(msg, zmq.NOBLOCK)
except zmq.Again:
pass # drop if collector isn't running or queue full

View File

@@ -0,0 +1,67 @@
#!/usr/bin/env python3
"""
ClearPilot telemetry collector.
Receives telemetry packets from any process via ZMQ, diffs against previous
state per group, and writes only changed values to CSV.
CSV format: timestamp,group,key,value
"""
import csv
import json
import os
import zmq
from openpilot.selfdrive.clearpilot.telemetry import TELEMETRY_SOCK
from openpilot.selfdrive.manager.process import _log_dir
def main():
ctx = zmq.Context.instance()
sock = ctx.socket(zmq.PULL)
sock.setsockopt(zmq.RCVHWM, 1000)
sock.bind(TELEMETRY_SOCK)
csv_path = os.path.join(_log_dir, "telemetry.csv")
state: dict[str, dict] = {} # group -> {key: last_value}
with open(csv_path, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["timestamp", "group", "key", "value"])
f.flush()
while True:
try:
raw = sock.recv_string()
except zmq.ZMQError:
continue
try:
pkt = json.loads(raw)
except json.JSONDecodeError:
continue
ts = pkt.get("ts", 0)
group = pkt.get("group", "")
data = pkt.get("data", {})
if group not in state:
state[group] = {}
prev = state[group]
changed = False
for key, value in data.items():
# Convert to string for comparison so floats/ints/bools all diff correctly
str_val = str(value)
if key not in prev or prev[key] != str_val:
writer.writerow([f"{ts:.6f}", group, key, str_val])
prev[key] = str_val
changed = True
if changed:
f.flush()
if __name__ == "__main__":
main()

View File

@@ -109,6 +109,7 @@ procs = [
# ClearPilot processes
NativeProcess("dashcamd", "selfdrive/clearpilot", ["./dashcamd"], dashcam_should_run),
PythonProcess("telemetryd", "selfdrive.clearpilot.telemetryd", always_run),
]
managed_processes = {p.name: p for p in procs}