#!/usr/bin/env python3 """ UDP receiver for LIN sniffer/master stream. Purpose - Capture all UDP datagrams from the ESP and write a CSV for later diffing. - Print a live summary to the console. - Optionally parse lines that contain textual "PID XX" + hex bytes and emit only changes per PID (useful when the stream is verbose). This is intentionally tolerant about payload format: it records raw hex and, if the payload looks textual, the decoded text as well. Examples # Basic recording on port 5555 python udp_lin_receiver.py --port 5555 --out capture.csv # Only show changed payloads per PID (when textual lines include "PID ..") python udp_lin_receiver.py --port 5555 --out capture.csv --pid-diff # Quiet mode (only CSV), capture for 5 minutes python udp_lin_receiver.py --quiet --duration 300 --out capture.csv """ from __future__ import annotations import argparse import csv import datetime as dt import os import re import socket import sys import time from typing import Dict, Optional, Tuple def is_mostly_text(b: bytes, threshold: float = 0.9) -> bool: if not b: return True printable = sum((32 <= c < 127) or c in (9, 10, 13) for c in b) return printable / len(b) >= threshold def to_hex(b: bytes) -> str: return " ".join(f"{x:02X}" for x in b) PID_LINE_RE = re.compile(r"PID\s+([0-9A-Fa-f]{2}).*?([0-9A-Fa-f]{2}(?:\s+[0-9A-Fa-f]{2})*)") def maybe_parse_pid_and_data(text: str) -> Optional[Tuple[int, Tuple[int, ...]]]: """Parse lines like: 'PID 20 12 34 56 78 ...' -> (pid, (bytes...)) Returns None if no clear match. """ m = PID_LINE_RE.search(text) if not m: return None try: pid = int(m.group(1), 16) data_hex = m.group(2) data = tuple(int(x, 16) for x in data_hex.split()) return pid, data except Exception: return None def now_iso() -> str: return dt.datetime.now(dt.timezone.utc).astimezone().isoformat(timespec="milliseconds") def run(): ap = argparse.ArgumentParser(description="UDP LIN receiver") ap.add_argument("--host", default="0.0.0.0", help="Listen host (default: 0.0.0.0)") ap.add_argument("--port", type=int, default=5555, help="Listen port (default: 5555)") ap.add_argument("--out", default="udp_capture.csv", help="CSV output file path") ap.add_argument("--append", action="store_true", help="Append to CSV if exists") ap.add_argument("--duration", type=float, default=0.0, help="Stop after N seconds (0 = no limit)") ap.add_argument("--quiet", action="store_true", help="No console output, only CSV") ap.add_argument("--pid-diff", action="store_true", help="Track textual 'PID ..' lines and only print when data changes") ap.add_argument("--buffer", type=int, default=4096, help="Receive buffer size (default: 4096)") args = ap.parse_args() # Prepare CSV out_exists = os.path.exists(args.out) csv_mode = "a" if args.append else "w" with open(args.out, csv_mode, newline="", encoding="utf-8") as f: w = csv.writer(f) if not out_exists or not args.append: w.writerow(["ts", "src_ip", "src_port", "size", "is_text", "text", "hex"]) # header # Socket sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((args.host, args.port)) sock.settimeout(1.0) if not args.quiet: print(f"Listening on {args.host}:{args.port} → {args.out}") if args.pid_diff: print("PID diff mode: will summarize changes per PID when textual lines include 'PID ..' and hex bytes") start = time.time() last_by_pid: Dict[int, Tuple[int, ...]] = {} while True: if args.duration and (time.time() - start) >= args.duration: if not args.quiet: print("Duration reached; exiting.") break try: data, addr = sock.recvfrom(args.buffer) except socket.timeout: continue ts = now_iso() src_ip, src_port = addr[0], addr[1] size = len(data) as_text = "" is_text = False try: if is_mostly_text(data): as_text = data.decode("utf-8", errors="replace").strip() is_text = True except Exception: pass hexstr = to_hex(data) w.writerow([ts, src_ip, src_port, size, int(is_text), as_text, hexstr]) # Console output if not args.quiet: if args.pid_diff and is_text: parsed = maybe_parse_pid_and_data(as_text) if parsed: pid, pdata = parsed last = last_by_pid.get(pid) if last != pdata: last_by_pid[pid] = pdata print(f"{ts} PID {pid:02X} len={len(pdata)} changed: { ' '.join(f'{b:02X}' for b in pdata) }") continue # default console print if is_text and as_text: print(f"{ts} {src_ip}:{src_port} ({size}B) TEXT: {as_text}") else: print(f"{ts} {src_ip}:{src_port} ({size}B) HEX: {hexstr}") if __name__ == "__main__": try: run() except KeyboardInterrupt: pass