Files
clearpilot/system/clearpilot/gpsd.py
Brian Hanson 5b91dac33e 5-state display mode: auto day/night, nightrider, screen off
New ScreenDisplayMode param (fixes ScreenDisaplayMode typo):
  0=auto-normal, 1=auto-nightrider, 2=normal, 3=screen-off, 4=nightrider

Nightrider mode: black background (no camera feed), path/lane lines
drawn as 2px outlines only. Auto mode uses NOAA solar position calc
in gpsd to switch between day/night based on GPS lat/lon and UTC time.
First calc on GPS fix, then every 30 seconds.

Button cycle onroad: 0→4, 1→2, 2→3, 3→4, 4→2 (never back to auto).
Offroad: any→3, 3→0. bench_cmd debugbutton simulates the button press.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-13 06:49:38 +00:00

229 lines
8.1 KiB
Python

#!/usr/bin/env python3
"""
ClearPilot GPS daemon — reads GPS from Quectel EC25 modem via AT commands
and publishes gpsLocation messages for locationd/timed.
Replaces qcomgpsd which uses the diag interface (broken on this device).
"""
import math
import os
import subprocess
import time
import datetime
from cereal import log
import cereal.messaging as messaging
from openpilot.common.gpio import gpio_init, gpio_set
from openpilot.common.params import Params
from openpilot.common.swaglog import cloudlog
from openpilot.common.time import system_time_valid
from openpilot.system.hardware.tici.pins import GPIO
def is_daylight(lat: float, lon: float, utc_dt: datetime.datetime) -> bool:
"""Return True if it's between sunrise and sunset at the given location.
Uses NOAA simplified solar position equations. Pure math, no external libs."""
n = utc_dt.timetuple().tm_yday
gamma = 2 * math.pi / 365 * (n - 1 + (utc_dt.hour - 12) / 24)
# Equation of time (minutes)
eqtime = 229.18 * (0.000075 + 0.001868 * math.cos(gamma)
- 0.032077 * math.sin(gamma)
- 0.014615 * math.cos(2 * gamma)
- 0.040849 * math.sin(2 * gamma))
# Solar declination (radians)
decl = (0.006918 - 0.399912 * math.cos(gamma)
+ 0.070257 * math.sin(gamma)
- 0.006758 * math.cos(2 * gamma)
+ 0.000907 * math.sin(2 * gamma)
- 0.002697 * math.cos(3 * gamma)
+ 0.00148 * math.sin(3 * gamma))
lat_rad = math.radians(lat)
zenith = math.radians(90.833)
cos_ha = (math.cos(zenith) / (math.cos(lat_rad) * math.cos(decl))
- math.tan(lat_rad) * math.tan(decl))
if cos_ha < -1:
return True # midnight sun
if cos_ha > 1:
return False # polar night
ha = math.degrees(math.acos(cos_ha))
sunrise_min = 720 - 4 * (lon + ha) - eqtime
sunset_min = 720 - 4 * (lon - ha) - eqtime
now_min = utc_dt.hour * 60 + utc_dt.minute + utc_dt.second / 60
return sunrise_min <= now_min <= sunset_min
def at_cmd(cmd: str) -> str:
try:
result = subprocess.check_output(
f"mmcli -m any --timeout 10 --command='{cmd}'",
shell=True, encoding='utf8', stderr=subprocess.DEVNULL
).strip()
# mmcli wraps AT responses: response: '+QGPSLOC: ...'
# Strip the wrapper to get just the AT response
if result.startswith("response: '") and result.endswith("'"):
result = result[len("response: '"):-1]
return result
except subprocess.CalledProcessError:
return ""
def wait_for_modem():
cloudlog.warning("gpsd: waiting for modem")
while True:
ret = subprocess.call(
"mmcli -m any --timeout 10 --command='AT+QGPS?'",
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=True
)
if ret == 0:
return
time.sleep(0.5)
def parse_qgpsloc(response: str):
"""Parse AT+QGPSLOC=2 response into a dict.
Format: +QGPSLOC: UTC,lat,lon,hdop,alt,fix,cog,spkm,spkn,date,nsat
"""
if "+QGPSLOC:" not in response:
return None
try:
data = response.split("+QGPSLOC:")[1].strip()
fields = data.split(",")
if len(fields) < 11:
return None
utc = fields[0] # HHMMSS.S
lat = float(fields[1])
lon = float(fields[2])
hdop = float(fields[3])
alt = float(fields[4])
fix = int(fields[5]) # 2=2D, 3=3D
cog = float(fields[6]) # course over ground
spkm = float(fields[7]) # speed km/h
spkn = float(fields[8]) # speed knots
date = fields[9] # DDMMYY
nsat = int(fields[10])
# Build unix timestamp from UTC + date
# utc: "HHMMSS.S", date: "DDMMYY"
hh, mm = int(utc[0:2]), int(utc[2:4])
ss = float(utc[4:])
dd, mo, yy = int(date[0:2]), int(date[2:4]), 2000 + int(date[4:6])
dt = datetime.datetime(yy, mo, dd, hh, mm, int(ss),
int((ss % 1) * 1e6), datetime.timezone.utc)
return {
"latitude": lat,
"longitude": lon,
"altitude": alt,
"speed": spkm / 3.6, # convert km/h to m/s
"bearing": cog,
"accuracy": hdop * 5, # rough conversion from HDOP to meters
"timestamp_ms": dt.timestamp() * 1e3,
"satellites": nsat,
"fix": fix,
}
except (ValueError, IndexError) as e:
cloudlog.error(f"gpsd: parse error: {e}")
return None
def main():
import sys
print("gpsd: starting", file=sys.stderr, flush=True)
# Kill system gpsd which may respawn and interfere with modem access
subprocess.run("pkill -f /usr/sbin/gpsd", shell=True,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
wait_for_modem()
print("gpsd: modem ready", file=sys.stderr, flush=True)
# Enable GPS antenna power
gpio_init(GPIO.GNSS_PWR_EN, True)
gpio_set(GPIO.GNSS_PWR_EN, True)
print("gpsd: GPIO power enabled", file=sys.stderr, flush=True)
# Don't restart GPS if already running (preserve existing fix)
resp = at_cmd("AT+QGPS?")
print(f"gpsd: QGPS status: {resp}", file=sys.stderr, flush=True)
if "QGPS: 1" not in resp:
at_cmd('AT+QGPSCFG="dpoenable",0')
at_cmd('AT+QGPSCFG="outport","none"')
at_cmd("AT+QGPS=1")
print("gpsd: GPS started fresh", file=sys.stderr, flush=True)
else:
print("gpsd: GPS already running, keeping fix", file=sys.stderr, flush=True)
pm = messaging.PubMaster(["gpsLocation"])
clock_set = system_time_valid()
params_memory = Params("/dev/shm/params")
last_daylight_check = 0.0
daylight_computed = False
print("gpsd: entering main loop", file=sys.stderr, flush=True)
while True:
resp = at_cmd("AT+QGPSLOC=2")
fix = parse_qgpsloc(resp)
if fix:
# Set system clock from GPS on first valid fix if clock is invalid
if not clock_set:
gps_dt = datetime.datetime.utcfromtimestamp(fix["timestamp_ms"] / 1000)
ret = subprocess.run(["date", "-s", gps_dt.strftime("%Y-%m-%d %H:%M:%S")],
env={**os.environ, "TZ": "UTC"},
capture_output=True)
if ret.returncode == 0:
clock_set = True
cloudlog.warning("gpsd: system clock set from GPS: %s", gps_dt)
print(f"gpsd: system clock set from GPS: {gps_dt}", file=sys.stderr, flush=True)
else:
cloudlog.error("gpsd: failed to set clock: %s", ret.stderr.decode().strip())
msg = messaging.new_message("gpsLocation", valid=True)
gps = msg.gpsLocation
gps.latitude = fix["latitude"]
gps.longitude = fix["longitude"]
gps.altitude = fix["altitude"]
gps.speed = fix["speed"]
gps.bearingDeg = fix["bearing"]
gps.horizontalAccuracy = fix["accuracy"]
gps.unixTimestampMillis = int(fix["timestamp_ms"])
gps.source = log.GpsLocationData.SensorSource.qcomdiag
gps.hasFix = fix["fix"] >= 2
gps.flags = 1
gps.vNED = [0.0, 0.0, 0.0]
gps.verticalAccuracy = fix["accuracy"]
gps.bearingAccuracyDeg = 10.0
gps.speedAccuracy = 1.0
pm.send("gpsLocation", msg)
# CLEARPILOT: daylight calculation for auto display mode switching
if clock_set:
now_mono = time.monotonic()
interval = 1.0 if not daylight_computed else 30.0
if (now_mono - last_daylight_check) >= interval:
last_daylight_check = now_mono
utc_now = datetime.datetime.utcfromtimestamp(fix["timestamp_ms"] / 1000)
daylight = is_daylight(fix["latitude"], fix["longitude"], utc_now)
params_memory.put_bool("IsDaylight", daylight)
if not daylight_computed:
daylight_computed = True
cloudlog.warning("gpsd: initial daylight calc: %s", "day" if daylight else "night")
print(f"gpsd: initial daylight calc: {'day' if daylight else 'night'}", file=sys.stderr, flush=True)
# Auto-transition: only touch states 0 and 1
current_mode = params_memory.get_int("ScreenDisplayMode")
if current_mode == 0 and not daylight:
params_memory.put_int("ScreenDisplayMode", 1)
cloudlog.warning("gpsd: auto-switch to nightrider (sunset)")
elif current_mode == 1 and daylight:
params_memory.put_int("ScreenDisplayMode", 0)
cloudlog.warning("gpsd: auto-switch to normal (sunrise)")
time.sleep(1.0) # 1 Hz polling
if __name__ == "__main__":
main()