#!/usr/bin/env python3 """ ClearPilot GPS daemon — reads GPS from Quectel EC25 modem via AT commands and publishes gpsLocation messages for locationd/timed. Replaces qcomgpsd which uses the diag interface (broken on this device). """ import math import os import subprocess import time import datetime from cereal import log import cereal.messaging as messaging from openpilot.common.gpio import gpio_init, gpio_set from openpilot.common.params import Params from openpilot.common.swaglog import cloudlog from openpilot.common.time import system_time_valid from openpilot.system.hardware.tici.pins import GPIO def is_daylight(lat: float, lon: float, utc_dt: datetime.datetime) -> bool: """Return True if it's between sunrise and sunset at the given location. Uses NOAA simplified solar position equations. Pure math, no external libs.""" n = utc_dt.timetuple().tm_yday gamma = 2 * math.pi / 365 * (n - 1 + (utc_dt.hour - 12) / 24) # Equation of time (minutes) eqtime = 229.18 * (0.000075 + 0.001868 * math.cos(gamma) - 0.032077 * math.sin(gamma) - 0.014615 * math.cos(2 * gamma) - 0.040849 * math.sin(2 * gamma)) # Solar declination (radians) decl = (0.006918 - 0.399912 * math.cos(gamma) + 0.070257 * math.sin(gamma) - 0.006758 * math.cos(2 * gamma) + 0.000907 * math.sin(2 * gamma) - 0.002697 * math.cos(3 * gamma) + 0.00148 * math.sin(3 * gamma)) lat_rad = math.radians(lat) zenith = math.radians(90.833) cos_ha = (math.cos(zenith) / (math.cos(lat_rad) * math.cos(decl)) - math.tan(lat_rad) * math.tan(decl)) if cos_ha < -1: return True # midnight sun if cos_ha > 1: return False # polar night ha = math.degrees(math.acos(cos_ha)) sunrise_min = 720 - 4 * (lon + ha) - eqtime sunset_min = 720 - 4 * (lon - ha) - eqtime now_min = utc_dt.hour * 60 + utc_dt.minute + utc_dt.second / 60 return sunrise_min <= now_min <= sunset_min def at_cmd(cmd: str) -> str: try: result = subprocess.check_output( f"mmcli -m any --timeout 10 --command='{cmd}'", shell=True, encoding='utf8', stderr=subprocess.DEVNULL ).strip() # mmcli wraps AT responses: response: '+QGPSLOC: ...' # Strip the wrapper to get just the AT response if result.startswith("response: '") and result.endswith("'"): result = result[len("response: '"):-1] return result except subprocess.CalledProcessError: return "" def wait_for_modem(): cloudlog.warning("gpsd: waiting for modem") while True: ret = subprocess.call( "mmcli -m any --timeout 10 --command='AT+QGPS?'", stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=True ) if ret == 0: return time.sleep(0.5) def parse_qgpsloc(response: str): """Parse AT+QGPSLOC=2 response into a dict. Format: +QGPSLOC: UTC,lat,lon,hdop,alt,fix,cog,spkm,spkn,date,nsat """ if "+QGPSLOC:" not in response: return None try: data = response.split("+QGPSLOC:")[1].strip() fields = data.split(",") if len(fields) < 11: return None utc = fields[0] # HHMMSS.S lat = float(fields[1]) lon = float(fields[2]) hdop = float(fields[3]) alt = float(fields[4]) fix = int(fields[5]) # 2=2D, 3=3D cog = float(fields[6]) # course over ground spkm = float(fields[7]) # speed km/h spkn = float(fields[8]) # speed knots date = fields[9] # DDMMYY nsat = int(fields[10]) # Build unix timestamp from UTC + date # utc: "HHMMSS.S", date: "DDMMYY" hh, mm = int(utc[0:2]), int(utc[2:4]) ss = float(utc[4:]) dd, mo, yy = int(date[0:2]), int(date[2:4]), 2000 + int(date[4:6]) dt = datetime.datetime(yy, mo, dd, hh, mm, int(ss), int((ss % 1) * 1e6), datetime.timezone.utc) return { "latitude": lat, "longitude": lon, "altitude": alt, "speed": spkm / 3.6, # convert km/h to m/s "bearing": cog, "accuracy": hdop * 5, # rough conversion from HDOP to meters "timestamp_ms": dt.timestamp() * 1e3, "satellites": nsat, "fix": fix, } except (ValueError, IndexError) as e: cloudlog.error(f"gpsd: parse error: {e}") return None def main(): import sys print("gpsd: starting", file=sys.stderr, flush=True) # Kill system gpsd which may respawn and interfere with modem access subprocess.run("pkill -f /usr/sbin/gpsd", shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) wait_for_modem() print("gpsd: modem ready", file=sys.stderr, flush=True) # Enable GPS antenna power gpio_init(GPIO.GNSS_PWR_EN, True) gpio_set(GPIO.GNSS_PWR_EN, True) print("gpsd: GPIO power enabled", file=sys.stderr, flush=True) # Don't restart GPS if already running (preserve existing fix) resp = at_cmd("AT+QGPS?") print(f"gpsd: QGPS status: {resp}", file=sys.stderr, flush=True) if "QGPS: 1" not in resp: at_cmd('AT+QGPSCFG="dpoenable",0') at_cmd('AT+QGPSCFG="outport","none"') at_cmd("AT+QGPS=1") print("gpsd: GPS started fresh", file=sys.stderr, flush=True) else: print("gpsd: GPS already running, keeping fix", file=sys.stderr, flush=True) pm = messaging.PubMaster(["gpsLocation"]) clock_set = system_time_valid() params_memory = Params("/dev/shm/params") last_daylight_check = 0.0 daylight_computed = False print("gpsd: entering main loop", file=sys.stderr, flush=True) while True: resp = at_cmd("AT+QGPSLOC=2") fix = parse_qgpsloc(resp) if fix: # Set system clock from GPS on first valid fix if clock is invalid if not clock_set: gps_dt = datetime.datetime.utcfromtimestamp(fix["timestamp_ms"] / 1000) ret = subprocess.run(["date", "-s", gps_dt.strftime("%Y-%m-%d %H:%M:%S")], env={**os.environ, "TZ": "UTC"}, capture_output=True) if ret.returncode == 0: clock_set = True cloudlog.warning("gpsd: system clock set from GPS: %s", gps_dt) print(f"gpsd: system clock set from GPS: {gps_dt}", file=sys.stderr, flush=True) else: cloudlog.error("gpsd: failed to set clock: %s", ret.stderr.decode().strip()) msg = messaging.new_message("gpsLocation", valid=True) gps = msg.gpsLocation gps.latitude = fix["latitude"] gps.longitude = fix["longitude"] gps.altitude = fix["altitude"] gps.speed = fix["speed"] gps.bearingDeg = fix["bearing"] gps.horizontalAccuracy = fix["accuracy"] gps.unixTimestampMillis = int(fix["timestamp_ms"]) gps.source = log.GpsLocationData.SensorSource.qcomdiag gps.hasFix = fix["fix"] >= 2 gps.flags = 1 gps.vNED = [0.0, 0.0, 0.0] gps.verticalAccuracy = fix["accuracy"] gps.bearingAccuracyDeg = 10.0 gps.speedAccuracy = 1.0 pm.send("gpsLocation", msg) # CLEARPILOT: daylight calculation for auto display mode switching if clock_set: now_mono = time.monotonic() interval = 1.0 if not daylight_computed else 30.0 if (now_mono - last_daylight_check) >= interval: last_daylight_check = now_mono utc_now = datetime.datetime.utcfromtimestamp(fix["timestamp_ms"] / 1000) daylight = is_daylight(fix["latitude"], fix["longitude"], utc_now) params_memory.put_bool("IsDaylight", daylight) if not daylight_computed: daylight_computed = True cloudlog.warning("gpsd: initial daylight calc: %s", "day" if daylight else "night") print(f"gpsd: initial daylight calc: {'day' if daylight else 'night'}", file=sys.stderr, flush=True) # Auto-transition: only touch states 0 and 1 current_mode = params_memory.get_int("ScreenDisplayMode") if current_mode == 0 and not daylight: params_memory.put_int("ScreenDisplayMode", 1) cloudlog.warning("gpsd: auto-switch to nightrider (sunset)") elif current_mode == 1 and daylight: params_memory.put_int("ScreenDisplayMode", 0) cloudlog.warning("gpsd: auto-switch to normal (sunrise)") time.sleep(1.0) # 1 Hz polling if __name__ == "__main__": main()