diff --git a/selfdrive/clearpilot/telemetry.py b/selfdrive/clearpilot/telemetry.py new file mode 100644 index 0000000..1ded08f --- /dev/null +++ b/selfdrive/clearpilot/telemetry.py @@ -0,0 +1,34 @@ +""" +ClearPilot telemetry client library. + +Usage from any process: + from openpilot.selfdrive.clearpilot.telemetry import tlog + tlog("canbus", {"speed": 45.2, "speed_limit": 60}) + +Sends JSON packets over ZMQ PUSH to telemetryd, which diffs and writes CSV. +""" +import json +import time +import zmq + +TELEMETRY_SOCK = "ipc:///tmp/clearpilot_telemetry" + +_ctx = None +_sock = None + + +def tlog(group: str, data: dict): + """Log a telemetry packet. Only changed values will be written to CSV by telemetryd.""" + global _ctx, _sock + if _sock is None: + _ctx = zmq.Context.instance() + _sock = _ctx.socket(zmq.PUSH) + _sock.setsockopt(zmq.LINGER, 0) + _sock.setsockopt(zmq.SNDHWM, 100) + _sock.connect(TELEMETRY_SOCK) + + msg = json.dumps({"ts": time.time(), "group": group, "data": data}) + try: + _sock.send_string(msg, zmq.NOBLOCK) + except zmq.Again: + pass # drop if collector isn't running or queue full diff --git a/selfdrive/clearpilot/telemetryd.py b/selfdrive/clearpilot/telemetryd.py new file mode 100644 index 0000000..3a4537d --- /dev/null +++ b/selfdrive/clearpilot/telemetryd.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python3 +""" +ClearPilot telemetry collector. + +Receives telemetry packets from any process via ZMQ, diffs against previous +state per group, and writes only changed values to CSV. + +CSV format: timestamp,group,key,value +""" +import csv +import json +import os +import zmq + +from openpilot.selfdrive.clearpilot.telemetry import TELEMETRY_SOCK +from openpilot.selfdrive.manager.process import _log_dir + + +def main(): + ctx = zmq.Context.instance() + sock = ctx.socket(zmq.PULL) + sock.setsockopt(zmq.RCVHWM, 1000) + sock.bind(TELEMETRY_SOCK) + + csv_path = os.path.join(_log_dir, "telemetry.csv") + state: dict[str, dict] = {} # group -> {key: last_value} + + with open(csv_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["timestamp", "group", "key", "value"]) + f.flush() + + while True: + try: + raw = sock.recv_string() + except zmq.ZMQError: + continue + + try: + pkt = json.loads(raw) + except json.JSONDecodeError: + continue + + ts = pkt.get("ts", 0) + group = pkt.get("group", "") + data = pkt.get("data", {}) + + if group not in state: + state[group] = {} + + prev = state[group] + changed = False + + for key, value in data.items(): + # Convert to string for comparison so floats/ints/bools all diff correctly + str_val = str(value) + if key not in prev or prev[key] != str_val: + writer.writerow([f"{ts:.6f}", group, key, str_val]) + prev[key] = str_val + changed = True + + if changed: + f.flush() + + +if __name__ == "__main__": + main() diff --git a/selfdrive/manager/process_config.py b/selfdrive/manager/process_config.py index 6fe78cb..9a0b218 100755 --- a/selfdrive/manager/process_config.py +++ b/selfdrive/manager/process_config.py @@ -109,6 +109,7 @@ procs = [ # ClearPilot processes NativeProcess("dashcamd", "selfdrive/clearpilot", ["./dashcamd"], dashcam_should_run), + PythonProcess("telemetryd", "selfdrive.clearpilot.telemetryd", always_run), ] managed_processes = {p.name: p for p in procs} \ No newline at end of file